blob: e4ad904ed4d0e8c49b00f358122e798f853eb50e [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Antoine Pitroude911b22011-12-21 11:03:24 +010011import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020015import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import signal
17import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import socket
19import random
20import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000021import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000022
Benjamin Petersone5384b02008-10-04 22:00:42 +000023
R. David Murraya21e4ca2009-03-31 23:16:50 +000024# Skip tests if _multiprocessing wasn't built.
25_multiprocessing = test.support.import_module('_multiprocessing')
26# Skip tests if sem_open implementation is broken.
27test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000028# import threading after _multiprocessing to raise a more revelant error
29# message: "No module named _multiprocessing". _multiprocessing is not compiled
30# without thread support.
31import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000032
Benjamin Petersone711caf2008-06-11 16:44:04 +000033import multiprocessing.dummy
34import multiprocessing.connection
35import multiprocessing.managers
36import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000037import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000038
Charles-François Natalibc8f0822011-09-20 20:36:51 +020039from multiprocessing import util
40
41try:
42 from multiprocessing import reduction
43 HAS_REDUCTION = True
44except ImportError:
45 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000046
Brian Curtinafa88b52010-10-07 01:12:19 +000047try:
48 from multiprocessing.sharedctypes import Value, copy
49 HAS_SHAREDCTYPES = True
50except ImportError:
51 HAS_SHAREDCTYPES = False
52
Antoine Pitroubcb39d42011-08-23 19:46:22 +020053try:
54 import msvcrt
55except ImportError:
56 msvcrt = None
57
Benjamin Petersone711caf2008-06-11 16:44:04 +000058#
59#
60#
61
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000062def latin(s):
63 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000064
Benjamin Petersone711caf2008-06-11 16:44:04 +000065#
66# Constants
67#
68
69LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000070#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000071
72DELTA = 0.1
73CHECK_TIMINGS = False # making true makes tests take a lot longer
74 # and can sometimes cause some non-serious
75 # failures because some calls block a bit
76 # longer than expected
77if CHECK_TIMINGS:
78 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
79else:
80 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
81
82HAVE_GETVALUE = not getattr(_multiprocessing,
83 'HAVE_BROKEN_SEM_GETVALUE', False)
84
Jesse Noller6214edd2009-01-19 16:23:53 +000085WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020086
Richard Oudkerk59d54042012-05-10 16:11:12 +010087from multiprocessing.connection import wait
Antoine Pitrou176f07d2011-06-06 19:35:31 +020088
Richard Oudkerk59d54042012-05-10 16:11:12 +010089def wait_for_handle(handle, timeout):
90 if timeout is not None and timeout < 0.0:
91 timeout = None
92 return wait([handle], timeout)
Jesse Noller6214edd2009-01-19 16:23:53 +000093
Antoine Pitroubcb39d42011-08-23 19:46:22 +020094try:
95 MAXFD = os.sysconf("SC_OPEN_MAX")
96except:
97 MAXFD = 256
98
Benjamin Petersone711caf2008-06-11 16:44:04 +000099#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000100# Some tests require ctypes
101#
102
103try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000104 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000105except ImportError:
106 Structure = object
107 c_int = c_double = None
108
Charles-François Natali221ef672011-11-22 18:55:22 +0100109
110def check_enough_semaphores():
111 """Check that the system supports enough semaphores to run the test."""
112 # minimum number of semaphores available according to POSIX
113 nsems_min = 256
114 try:
115 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
116 except (AttributeError, ValueError):
117 # sysconf not available or setting not available
118 return
119 if nsems == -1 or nsems >= nsems_min:
120 return
121 raise unittest.SkipTest("The OS doesn't support enough semaphores "
122 "to run the test (required: %d)." % nsems_min)
123
124
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000125#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000126# Creates a wrapper for a function which records the time it takes to finish
127#
128
129class TimingWrapper(object):
130
131 def __init__(self, func):
132 self.func = func
133 self.elapsed = None
134
135 def __call__(self, *args, **kwds):
136 t = time.time()
137 try:
138 return self.func(*args, **kwds)
139 finally:
140 self.elapsed = time.time() - t
141
142#
143# Base class for test cases
144#
145
146class BaseTestCase(object):
147
148 ALLOWED_TYPES = ('processes', 'manager', 'threads')
149
150 def assertTimingAlmostEqual(self, a, b):
151 if CHECK_TIMINGS:
152 self.assertAlmostEqual(a, b, 1)
153
154 def assertReturnsIfImplemented(self, value, func, *args):
155 try:
156 res = func(*args)
157 except NotImplementedError:
158 pass
159 else:
160 return self.assertEqual(value, res)
161
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000162 # For the sanity of Windows users, rather than crashing or freezing in
163 # multiple ways.
164 def __reduce__(self, *args):
165 raise NotImplementedError("shouldn't try to pickle a test case")
166
167 __reduce_ex__ = __reduce__
168
Benjamin Petersone711caf2008-06-11 16:44:04 +0000169#
170# Return the value of a semaphore
171#
172
173def get_value(self):
174 try:
175 return self.get_value()
176 except AttributeError:
177 try:
178 return self._Semaphore__value
179 except AttributeError:
180 try:
181 return self._value
182 except AttributeError:
183 raise NotImplementedError
184
185#
186# Testcases
187#
188
189class _TestProcess(BaseTestCase):
190
191 ALLOWED_TYPES = ('processes', 'threads')
192
193 def test_current(self):
194 if self.TYPE == 'threads':
195 return
196
197 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000198 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000199
200 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000201 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000202 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000203 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000204 self.assertEqual(current.ident, os.getpid())
205 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000207 def test_daemon_argument(self):
208 if self.TYPE == "threads":
209 return
210
211 # By default uses the current process's daemon flag.
212 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000213 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000214 proc1 = self.Process(target=self._test, daemon=True)
215 self.assertTrue(proc1.daemon)
216 proc2 = self.Process(target=self._test, daemon=False)
217 self.assertFalse(proc2.daemon)
218
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000219 @classmethod
220 def _test(cls, q, *args, **kwds):
221 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000222 q.put(args)
223 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000224 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000225 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000226 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000227 q.put(current.pid)
228
229 def test_process(self):
230 q = self.Queue(1)
231 e = self.Event()
232 args = (q, 1, 2)
233 kwargs = {'hello':23, 'bye':2.54}
234 name = 'SomeProcess'
235 p = self.Process(
236 target=self._test, args=args, kwargs=kwargs, name=name
237 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000238 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000239 current = self.current_process()
240
241 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000242 self.assertEqual(p.authkey, current.authkey)
243 self.assertEqual(p.is_alive(), False)
244 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000245 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000246 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000247 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000248
249 p.start()
250
Ezio Melottib3aedd42010-11-20 19:04:17 +0000251 self.assertEqual(p.exitcode, None)
252 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000253 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000254
Ezio Melottib3aedd42010-11-20 19:04:17 +0000255 self.assertEqual(q.get(), args[1:])
256 self.assertEqual(q.get(), kwargs)
257 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000258 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000259 self.assertEqual(q.get(), current.authkey)
260 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000261
262 p.join()
263
Ezio Melottib3aedd42010-11-20 19:04:17 +0000264 self.assertEqual(p.exitcode, 0)
265 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000266 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000267
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000268 @classmethod
269 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000270 time.sleep(1000)
271
272 def test_terminate(self):
273 if self.TYPE == 'threads':
274 return
275
276 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000277 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000278 p.start()
279
280 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000281 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000282 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000283
Richard Oudkerk59d54042012-05-10 16:11:12 +0100284 join = TimingWrapper(p.join)
285
286 self.assertEqual(join(0), None)
287 self.assertTimingAlmostEqual(join.elapsed, 0.0)
288 self.assertEqual(p.is_alive(), True)
289
290 self.assertEqual(join(-1), None)
291 self.assertTimingAlmostEqual(join.elapsed, 0.0)
292 self.assertEqual(p.is_alive(), True)
293
Benjamin Petersone711caf2008-06-11 16:44:04 +0000294 p.terminate()
295
Benjamin Petersone711caf2008-06-11 16:44:04 +0000296 self.assertEqual(join(), None)
297 self.assertTimingAlmostEqual(join.elapsed, 0.0)
298
299 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000300 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000301
302 p.join()
303
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000304 # XXX sometimes get p.exitcode == 0 on Windows ...
305 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000306
307 def test_cpu_count(self):
308 try:
309 cpus = multiprocessing.cpu_count()
310 except NotImplementedError:
311 cpus = 1
312 self.assertTrue(type(cpus) is int)
313 self.assertTrue(cpus >= 1)
314
315 def test_active_children(self):
316 self.assertEqual(type(self.active_children()), list)
317
318 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000319 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000320
Jesus Cea94f964f2011-09-09 20:26:57 +0200321 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000322 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000323 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000324
325 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000326 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000327
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000328 @classmethod
329 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000330 from multiprocessing import forking
331 wconn.send(id)
332 if len(id) < 2:
333 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000334 p = cls.Process(
335 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000336 )
337 p.start()
338 p.join()
339
340 def test_recursion(self):
341 rconn, wconn = self.Pipe(duplex=False)
342 self._test_recursion(wconn, [])
343
344 time.sleep(DELTA)
345 result = []
346 while rconn.poll():
347 result.append(rconn.recv())
348
349 expected = [
350 [],
351 [0],
352 [0, 0],
353 [0, 1],
354 [1],
355 [1, 0],
356 [1, 1]
357 ]
358 self.assertEqual(result, expected)
359
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200360 @classmethod
361 def _test_sentinel(cls, event):
362 event.wait(10.0)
363
364 def test_sentinel(self):
365 if self.TYPE == "threads":
366 return
367 event = self.Event()
368 p = self.Process(target=self._test_sentinel, args=(event,))
369 with self.assertRaises(ValueError):
370 p.sentinel
371 p.start()
372 self.addCleanup(p.join)
373 sentinel = p.sentinel
374 self.assertIsInstance(sentinel, int)
375 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
376 event.set()
377 p.join()
378 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
379
Benjamin Petersone711caf2008-06-11 16:44:04 +0000380#
381#
382#
383
384class _UpperCaser(multiprocessing.Process):
385
386 def __init__(self):
387 multiprocessing.Process.__init__(self)
388 self.child_conn, self.parent_conn = multiprocessing.Pipe()
389
390 def run(self):
391 self.parent_conn.close()
392 for s in iter(self.child_conn.recv, None):
393 self.child_conn.send(s.upper())
394 self.child_conn.close()
395
396 def submit(self, s):
397 assert type(s) is str
398 self.parent_conn.send(s)
399 return self.parent_conn.recv()
400
401 def stop(self):
402 self.parent_conn.send(None)
403 self.parent_conn.close()
404 self.child_conn.close()
405
406class _TestSubclassingProcess(BaseTestCase):
407
408 ALLOWED_TYPES = ('processes',)
409
410 def test_subclassing(self):
411 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200412 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000413 uppercaser.start()
414 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
415 self.assertEqual(uppercaser.submit('world'), 'WORLD')
416 uppercaser.stop()
417 uppercaser.join()
418
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100419 def test_stderr_flush(self):
420 # sys.stderr is flushed at process shutdown (issue #13812)
421 if self.TYPE == "threads":
422 return
423
424 testfn = test.support.TESTFN
425 self.addCleanup(test.support.unlink, testfn)
426 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
427 proc.start()
428 proc.join()
429 with open(testfn, 'r') as f:
430 err = f.read()
431 # The whole traceback was printed
432 self.assertIn("ZeroDivisionError", err)
433 self.assertIn("test_multiprocessing.py", err)
434 self.assertIn("1/0 # MARKER", err)
435
436 @classmethod
437 def _test_stderr_flush(cls, testfn):
438 sys.stderr = open(testfn, 'w')
439 1/0 # MARKER
440
441
Richard Oudkerk29471de2012-06-06 19:04:57 +0100442 @classmethod
443 def _test_sys_exit(cls, reason, testfn):
444 sys.stderr = open(testfn, 'w')
445 sys.exit(reason)
446
447 def test_sys_exit(self):
448 # See Issue 13854
449 if self.TYPE == 'threads':
450 return
451
452 testfn = test.support.TESTFN
453 self.addCleanup(test.support.unlink, testfn)
454
455 for reason, code in (([1, 2, 3], 1), ('ignore this', 0)):
456 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
457 p.daemon = True
458 p.start()
459 p.join(5)
460 self.assertEqual(p.exitcode, code)
461
462 with open(testfn, 'r') as f:
463 self.assertEqual(f.read().rstrip(), str(reason))
464
465 for reason in (True, False, 8):
466 p = self.Process(target=sys.exit, args=(reason,))
467 p.daemon = True
468 p.start()
469 p.join(5)
470 self.assertEqual(p.exitcode, reason)
471
Benjamin Petersone711caf2008-06-11 16:44:04 +0000472#
473#
474#
475
476def queue_empty(q):
477 if hasattr(q, 'empty'):
478 return q.empty()
479 else:
480 return q.qsize() == 0
481
482def queue_full(q, maxsize):
483 if hasattr(q, 'full'):
484 return q.full()
485 else:
486 return q.qsize() == maxsize
487
488
489class _TestQueue(BaseTestCase):
490
491
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000492 @classmethod
493 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000494 child_can_start.wait()
495 for i in range(6):
496 queue.get()
497 parent_can_continue.set()
498
499 def test_put(self):
500 MAXSIZE = 6
501 queue = self.Queue(maxsize=MAXSIZE)
502 child_can_start = self.Event()
503 parent_can_continue = self.Event()
504
505 proc = self.Process(
506 target=self._test_put,
507 args=(queue, child_can_start, parent_can_continue)
508 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000509 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000510 proc.start()
511
512 self.assertEqual(queue_empty(queue), True)
513 self.assertEqual(queue_full(queue, MAXSIZE), False)
514
515 queue.put(1)
516 queue.put(2, True)
517 queue.put(3, True, None)
518 queue.put(4, False)
519 queue.put(5, False, None)
520 queue.put_nowait(6)
521
522 # the values may be in buffer but not yet in pipe so sleep a bit
523 time.sleep(DELTA)
524
525 self.assertEqual(queue_empty(queue), False)
526 self.assertEqual(queue_full(queue, MAXSIZE), True)
527
528 put = TimingWrapper(queue.put)
529 put_nowait = TimingWrapper(queue.put_nowait)
530
531 self.assertRaises(pyqueue.Full, put, 7, False)
532 self.assertTimingAlmostEqual(put.elapsed, 0)
533
534 self.assertRaises(pyqueue.Full, put, 7, False, None)
535 self.assertTimingAlmostEqual(put.elapsed, 0)
536
537 self.assertRaises(pyqueue.Full, put_nowait, 7)
538 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
539
540 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
541 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
542
543 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
544 self.assertTimingAlmostEqual(put.elapsed, 0)
545
546 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
547 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
548
549 child_can_start.set()
550 parent_can_continue.wait()
551
552 self.assertEqual(queue_empty(queue), True)
553 self.assertEqual(queue_full(queue, MAXSIZE), False)
554
555 proc.join()
556
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000557 @classmethod
558 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000559 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000560 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000561 queue.put(2)
562 queue.put(3)
563 queue.put(4)
564 queue.put(5)
565 parent_can_continue.set()
566
567 def test_get(self):
568 queue = self.Queue()
569 child_can_start = self.Event()
570 parent_can_continue = self.Event()
571
572 proc = self.Process(
573 target=self._test_get,
574 args=(queue, child_can_start, parent_can_continue)
575 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000576 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000577 proc.start()
578
579 self.assertEqual(queue_empty(queue), True)
580
581 child_can_start.set()
582 parent_can_continue.wait()
583
584 time.sleep(DELTA)
585 self.assertEqual(queue_empty(queue), False)
586
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000587 # Hangs unexpectedly, remove for now
588 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000589 self.assertEqual(queue.get(True, None), 2)
590 self.assertEqual(queue.get(True), 3)
591 self.assertEqual(queue.get(timeout=1), 4)
592 self.assertEqual(queue.get_nowait(), 5)
593
594 self.assertEqual(queue_empty(queue), True)
595
596 get = TimingWrapper(queue.get)
597 get_nowait = TimingWrapper(queue.get_nowait)
598
599 self.assertRaises(pyqueue.Empty, get, False)
600 self.assertTimingAlmostEqual(get.elapsed, 0)
601
602 self.assertRaises(pyqueue.Empty, get, False, None)
603 self.assertTimingAlmostEqual(get.elapsed, 0)
604
605 self.assertRaises(pyqueue.Empty, get_nowait)
606 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
607
608 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
609 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
610
611 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
612 self.assertTimingAlmostEqual(get.elapsed, 0)
613
614 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
615 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
616
617 proc.join()
618
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000619 @classmethod
620 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000621 for i in range(10, 20):
622 queue.put(i)
623 # note that at this point the items may only be buffered, so the
624 # process cannot shutdown until the feeder thread has finished
625 # pushing items onto the pipe.
626
627 def test_fork(self):
628 # Old versions of Queue would fail to create a new feeder
629 # thread for a forked process if the original process had its
630 # own feeder thread. This test checks that this no longer
631 # happens.
632
633 queue = self.Queue()
634
635 # put items on queue so that main process starts a feeder thread
636 for i in range(10):
637 queue.put(i)
638
639 # wait to make sure thread starts before we fork a new process
640 time.sleep(DELTA)
641
642 # fork process
643 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200644 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000645 p.start()
646
647 # check that all expected items are in the queue
648 for i in range(20):
649 self.assertEqual(queue.get(), i)
650 self.assertRaises(pyqueue.Empty, queue.get, False)
651
652 p.join()
653
654 def test_qsize(self):
655 q = self.Queue()
656 try:
657 self.assertEqual(q.qsize(), 0)
658 except NotImplementedError:
659 return
660 q.put(1)
661 self.assertEqual(q.qsize(), 1)
662 q.put(5)
663 self.assertEqual(q.qsize(), 2)
664 q.get()
665 self.assertEqual(q.qsize(), 1)
666 q.get()
667 self.assertEqual(q.qsize(), 0)
668
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000669 @classmethod
670 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000671 for obj in iter(q.get, None):
672 time.sleep(DELTA)
673 q.task_done()
674
675 def test_task_done(self):
676 queue = self.JoinableQueue()
677
678 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000679 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000680
681 workers = [self.Process(target=self._test_task_done, args=(queue,))
682 for i in range(4)]
683
684 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200685 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000686 p.start()
687
688 for i in range(10):
689 queue.put(i)
690
691 queue.join()
692
693 for p in workers:
694 queue.put(None)
695
696 for p in workers:
697 p.join()
698
699#
700#
701#
702
703class _TestLock(BaseTestCase):
704
705 def test_lock(self):
706 lock = self.Lock()
707 self.assertEqual(lock.acquire(), True)
708 self.assertEqual(lock.acquire(False), False)
709 self.assertEqual(lock.release(), None)
710 self.assertRaises((ValueError, threading.ThreadError), lock.release)
711
712 def test_rlock(self):
713 lock = self.RLock()
714 self.assertEqual(lock.acquire(), True)
715 self.assertEqual(lock.acquire(), True)
716 self.assertEqual(lock.acquire(), True)
717 self.assertEqual(lock.release(), None)
718 self.assertEqual(lock.release(), None)
719 self.assertEqual(lock.release(), None)
720 self.assertRaises((AssertionError, RuntimeError), lock.release)
721
Jesse Nollerf8d00852009-03-31 03:25:07 +0000722 def test_lock_context(self):
723 with self.Lock():
724 pass
725
Benjamin Petersone711caf2008-06-11 16:44:04 +0000726
727class _TestSemaphore(BaseTestCase):
728
729 def _test_semaphore(self, sem):
730 self.assertReturnsIfImplemented(2, get_value, sem)
731 self.assertEqual(sem.acquire(), True)
732 self.assertReturnsIfImplemented(1, get_value, sem)
733 self.assertEqual(sem.acquire(), True)
734 self.assertReturnsIfImplemented(0, get_value, sem)
735 self.assertEqual(sem.acquire(False), False)
736 self.assertReturnsIfImplemented(0, get_value, sem)
737 self.assertEqual(sem.release(), None)
738 self.assertReturnsIfImplemented(1, get_value, sem)
739 self.assertEqual(sem.release(), None)
740 self.assertReturnsIfImplemented(2, get_value, sem)
741
742 def test_semaphore(self):
743 sem = self.Semaphore(2)
744 self._test_semaphore(sem)
745 self.assertEqual(sem.release(), None)
746 self.assertReturnsIfImplemented(3, get_value, sem)
747 self.assertEqual(sem.release(), None)
748 self.assertReturnsIfImplemented(4, get_value, sem)
749
750 def test_bounded_semaphore(self):
751 sem = self.BoundedSemaphore(2)
752 self._test_semaphore(sem)
753 # Currently fails on OS/X
754 #if HAVE_GETVALUE:
755 # self.assertRaises(ValueError, sem.release)
756 # self.assertReturnsIfImplemented(2, get_value, sem)
757
758 def test_timeout(self):
759 if self.TYPE != 'processes':
760 return
761
762 sem = self.Semaphore(0)
763 acquire = TimingWrapper(sem.acquire)
764
765 self.assertEqual(acquire(False), False)
766 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
767
768 self.assertEqual(acquire(False, None), False)
769 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
770
771 self.assertEqual(acquire(False, TIMEOUT1), False)
772 self.assertTimingAlmostEqual(acquire.elapsed, 0)
773
774 self.assertEqual(acquire(True, TIMEOUT2), False)
775 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
776
777 self.assertEqual(acquire(timeout=TIMEOUT3), False)
778 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
779
780
781class _TestCondition(BaseTestCase):
782
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000783 @classmethod
784 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000785 cond.acquire()
786 sleeping.release()
787 cond.wait(timeout)
788 woken.release()
789 cond.release()
790
791 def check_invariant(self, cond):
792 # this is only supposed to succeed when there are no sleepers
793 if self.TYPE == 'processes':
794 try:
795 sleepers = (cond._sleeping_count.get_value() -
796 cond._woken_count.get_value())
797 self.assertEqual(sleepers, 0)
798 self.assertEqual(cond._wait_semaphore.get_value(), 0)
799 except NotImplementedError:
800 pass
801
802 def test_notify(self):
803 cond = self.Condition()
804 sleeping = self.Semaphore(0)
805 woken = self.Semaphore(0)
806
807 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000808 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000809 p.start()
810
811 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000812 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000813 p.start()
814
815 # wait for both children to start sleeping
816 sleeping.acquire()
817 sleeping.acquire()
818
819 # check no process/thread has woken up
820 time.sleep(DELTA)
821 self.assertReturnsIfImplemented(0, get_value, woken)
822
823 # wake up one process/thread
824 cond.acquire()
825 cond.notify()
826 cond.release()
827
828 # check one process/thread has woken up
829 time.sleep(DELTA)
830 self.assertReturnsIfImplemented(1, get_value, woken)
831
832 # wake up another
833 cond.acquire()
834 cond.notify()
835 cond.release()
836
837 # check other has woken up
838 time.sleep(DELTA)
839 self.assertReturnsIfImplemented(2, get_value, woken)
840
841 # check state is not mucked up
842 self.check_invariant(cond)
843 p.join()
844
845 def test_notify_all(self):
846 cond = self.Condition()
847 sleeping = self.Semaphore(0)
848 woken = self.Semaphore(0)
849
850 # start some threads/processes which will timeout
851 for i in range(3):
852 p = self.Process(target=self.f,
853 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000854 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000855 p.start()
856
857 t = threading.Thread(target=self.f,
858 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000859 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000860 t.start()
861
862 # wait for them all to sleep
863 for i in range(6):
864 sleeping.acquire()
865
866 # check they have all timed out
867 for i in range(6):
868 woken.acquire()
869 self.assertReturnsIfImplemented(0, get_value, woken)
870
871 # check state is not mucked up
872 self.check_invariant(cond)
873
874 # start some more threads/processes
875 for i in range(3):
876 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000877 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000878 p.start()
879
880 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000881 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000882 t.start()
883
884 # wait for them to all sleep
885 for i in range(6):
886 sleeping.acquire()
887
888 # check no process/thread has woken up
889 time.sleep(DELTA)
890 self.assertReturnsIfImplemented(0, get_value, woken)
891
892 # wake them all up
893 cond.acquire()
894 cond.notify_all()
895 cond.release()
896
897 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200898 for i in range(10):
899 try:
900 if get_value(woken) == 6:
901 break
902 except NotImplementedError:
903 break
904 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000905 self.assertReturnsIfImplemented(6, get_value, woken)
906
907 # check state is not mucked up
908 self.check_invariant(cond)
909
910 def test_timeout(self):
911 cond = self.Condition()
912 wait = TimingWrapper(cond.wait)
913 cond.acquire()
914 res = wait(TIMEOUT1)
915 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000916 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000917 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
918
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200919 @classmethod
920 def _test_waitfor_f(cls, cond, state):
921 with cond:
922 state.value = 0
923 cond.notify()
924 result = cond.wait_for(lambda : state.value==4)
925 if not result or state.value != 4:
926 sys.exit(1)
927
928 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
929 def test_waitfor(self):
930 # based on test in test/lock_tests.py
931 cond = self.Condition()
932 state = self.Value('i', -1)
933
934 p = self.Process(target=self._test_waitfor_f, args=(cond, state))
935 p.daemon = True
936 p.start()
937
938 with cond:
939 result = cond.wait_for(lambda : state.value==0)
940 self.assertTrue(result)
941 self.assertEqual(state.value, 0)
942
943 for i in range(4):
944 time.sleep(0.01)
945 with cond:
946 state.value += 1
947 cond.notify()
948
949 p.join(5)
950 self.assertFalse(p.is_alive())
951 self.assertEqual(p.exitcode, 0)
952
953 @classmethod
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100954 def _test_waitfor_timeout_f(cls, cond, state, success, sem):
955 sem.release()
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200956 with cond:
957 expected = 0.1
958 dt = time.time()
959 result = cond.wait_for(lambda : state.value==4, timeout=expected)
960 dt = time.time() - dt
961 # borrow logic in assertTimeout() from test/lock_tests.py
962 if not result and expected * 0.6 < dt < expected * 10.0:
963 success.value = True
964
965 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
966 def test_waitfor_timeout(self):
967 # based on test in test/lock_tests.py
968 cond = self.Condition()
969 state = self.Value('i', 0)
970 success = self.Value('i', False)
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100971 sem = self.Semaphore(0)
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200972
973 p = self.Process(target=self._test_waitfor_timeout_f,
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100974 args=(cond, state, success, sem))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200975 p.daemon = True
976 p.start()
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100977 self.assertTrue(sem.acquire(timeout=10))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200978
979 # Only increment 3 times, so state == 4 is never reached.
980 for i in range(3):
981 time.sleep(0.01)
982 with cond:
983 state.value += 1
984 cond.notify()
985
986 p.join(5)
987 self.assertTrue(success.value)
988
Richard Oudkerk98449932012-06-05 13:15:29 +0100989 @classmethod
990 def _test_wait_result(cls, c, pid):
991 with c:
992 c.notify()
993 time.sleep(1)
994 if pid is not None:
995 os.kill(pid, signal.SIGINT)
996
997 def test_wait_result(self):
998 if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
999 pid = os.getpid()
1000 else:
1001 pid = None
1002
1003 c = self.Condition()
1004 with c:
1005 self.assertFalse(c.wait(0))
1006 self.assertFalse(c.wait(0.1))
1007
1008 p = self.Process(target=self._test_wait_result, args=(c, pid))
1009 p.start()
1010
1011 self.assertTrue(c.wait(10))
1012 if pid is not None:
1013 self.assertRaises(KeyboardInterrupt, c.wait, 10)
1014
1015 p.join()
1016
Benjamin Petersone711caf2008-06-11 16:44:04 +00001017
1018class _TestEvent(BaseTestCase):
1019
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001020 @classmethod
1021 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001022 time.sleep(TIMEOUT2)
1023 event.set()
1024
1025 def test_event(self):
1026 event = self.Event()
1027 wait = TimingWrapper(event.wait)
1028
Ezio Melotti13925002011-03-16 11:05:33 +02001029 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +00001030 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +00001031 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001032
Benjamin Peterson965ce872009-04-05 21:24:58 +00001033 # Removed, threading.Event.wait() will return the value of the __flag
1034 # instead of None. API Shear with the semaphore backed mp.Event
1035 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001036 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001037 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001038 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1039
1040 event.set()
1041
1042 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +00001043 self.assertEqual(event.is_set(), True)
1044 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001045 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001046 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001047 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1048 # self.assertEqual(event.is_set(), True)
1049
1050 event.clear()
1051
1052 #self.assertEqual(event.is_set(), False)
1053
Jesus Cea94f964f2011-09-09 20:26:57 +02001054 p = self.Process(target=self._test_event, args=(event,))
1055 p.daemon = True
1056 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +00001057 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001058
1059#
1060#
1061#
1062
1063class _TestValue(BaseTestCase):
1064
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001065 ALLOWED_TYPES = ('processes',)
1066
Benjamin Petersone711caf2008-06-11 16:44:04 +00001067 codes_values = [
1068 ('i', 4343, 24234),
1069 ('d', 3.625, -4.25),
1070 ('h', -232, 234),
1071 ('c', latin('x'), latin('y'))
1072 ]
1073
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001074 def setUp(self):
1075 if not HAS_SHAREDCTYPES:
1076 self.skipTest("requires multiprocessing.sharedctypes")
1077
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001078 @classmethod
1079 def _test(cls, values):
1080 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001081 sv.value = cv[2]
1082
1083
1084 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001085 if raw:
1086 values = [self.RawValue(code, value)
1087 for code, value, _ in self.codes_values]
1088 else:
1089 values = [self.Value(code, value)
1090 for code, value, _ in self.codes_values]
1091
1092 for sv, cv in zip(values, self.codes_values):
1093 self.assertEqual(sv.value, cv[1])
1094
1095 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001096 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001097 proc.start()
1098 proc.join()
1099
1100 for sv, cv in zip(values, self.codes_values):
1101 self.assertEqual(sv.value, cv[2])
1102
1103 def test_rawvalue(self):
1104 self.test_value(raw=True)
1105
1106 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001107 val1 = self.Value('i', 5)
1108 lock1 = val1.get_lock()
1109 obj1 = val1.get_obj()
1110
1111 val2 = self.Value('i', 5, lock=None)
1112 lock2 = val2.get_lock()
1113 obj2 = val2.get_obj()
1114
1115 lock = self.Lock()
1116 val3 = self.Value('i', 5, lock=lock)
1117 lock3 = val3.get_lock()
1118 obj3 = val3.get_obj()
1119 self.assertEqual(lock, lock3)
1120
Jesse Nollerb0516a62009-01-18 03:11:38 +00001121 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001122 self.assertFalse(hasattr(arr4, 'get_lock'))
1123 self.assertFalse(hasattr(arr4, 'get_obj'))
1124
Jesse Nollerb0516a62009-01-18 03:11:38 +00001125 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1126
1127 arr5 = self.RawValue('i', 5)
1128 self.assertFalse(hasattr(arr5, 'get_lock'))
1129 self.assertFalse(hasattr(arr5, 'get_obj'))
1130
Benjamin Petersone711caf2008-06-11 16:44:04 +00001131
1132class _TestArray(BaseTestCase):
1133
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001134 ALLOWED_TYPES = ('processes',)
1135
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001136 @classmethod
1137 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001138 for i in range(1, len(seq)):
1139 seq[i] += seq[i-1]
1140
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001141 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001142 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001143 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1144 if raw:
1145 arr = self.RawArray('i', seq)
1146 else:
1147 arr = self.Array('i', seq)
1148
1149 self.assertEqual(len(arr), len(seq))
1150 self.assertEqual(arr[3], seq[3])
1151 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1152
1153 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1154
1155 self.assertEqual(list(arr[:]), seq)
1156
1157 self.f(seq)
1158
1159 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001160 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001161 p.start()
1162 p.join()
1163
1164 self.assertEqual(list(arr[:]), seq)
1165
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001166 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001167 def test_array_from_size(self):
1168 size = 10
1169 # Test for zeroing (see issue #11675).
1170 # The repetition below strengthens the test by increasing the chances
1171 # of previously allocated non-zero memory being used for the new array
1172 # on the 2nd and 3rd loops.
1173 for _ in range(3):
1174 arr = self.Array('i', size)
1175 self.assertEqual(len(arr), size)
1176 self.assertEqual(list(arr), [0] * size)
1177 arr[:] = range(10)
1178 self.assertEqual(list(arr), list(range(10)))
1179 del arr
1180
1181 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001182 def test_rawarray(self):
1183 self.test_array(raw=True)
1184
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001185 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001186 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001187 arr1 = self.Array('i', list(range(10)))
1188 lock1 = arr1.get_lock()
1189 obj1 = arr1.get_obj()
1190
1191 arr2 = self.Array('i', list(range(10)), lock=None)
1192 lock2 = arr2.get_lock()
1193 obj2 = arr2.get_obj()
1194
1195 lock = self.Lock()
1196 arr3 = self.Array('i', list(range(10)), lock=lock)
1197 lock3 = arr3.get_lock()
1198 obj3 = arr3.get_obj()
1199 self.assertEqual(lock, lock3)
1200
Jesse Nollerb0516a62009-01-18 03:11:38 +00001201 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001202 self.assertFalse(hasattr(arr4, 'get_lock'))
1203 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001204 self.assertRaises(AttributeError,
1205 self.Array, 'i', range(10), lock='notalock')
1206
1207 arr5 = self.RawArray('i', range(10))
1208 self.assertFalse(hasattr(arr5, 'get_lock'))
1209 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001210
1211#
1212#
1213#
1214
1215class _TestContainers(BaseTestCase):
1216
1217 ALLOWED_TYPES = ('manager',)
1218
1219 def test_list(self):
1220 a = self.list(list(range(10)))
1221 self.assertEqual(a[:], list(range(10)))
1222
1223 b = self.list()
1224 self.assertEqual(b[:], [])
1225
1226 b.extend(list(range(5)))
1227 self.assertEqual(b[:], list(range(5)))
1228
1229 self.assertEqual(b[2], 2)
1230 self.assertEqual(b[2:10], [2,3,4])
1231
1232 b *= 2
1233 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1234
1235 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1236
1237 self.assertEqual(a[:], list(range(10)))
1238
1239 d = [a, b]
1240 e = self.list(d)
1241 self.assertEqual(
1242 e[:],
1243 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1244 )
1245
1246 f = self.list([a])
1247 a.append('hello')
1248 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1249
1250 def test_dict(self):
1251 d = self.dict()
1252 indices = list(range(65, 70))
1253 for i in indices:
1254 d[i] = chr(i)
1255 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1256 self.assertEqual(sorted(d.keys()), indices)
1257 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1258 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1259
1260 def test_namespace(self):
1261 n = self.Namespace()
1262 n.name = 'Bob'
1263 n.job = 'Builder'
1264 n._hidden = 'hidden'
1265 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1266 del n.job
1267 self.assertEqual(str(n), "Namespace(name='Bob')")
1268 self.assertTrue(hasattr(n, 'name'))
1269 self.assertTrue(not hasattr(n, 'job'))
1270
1271#
1272#
1273#
1274
1275def sqr(x, wait=0.0):
1276 time.sleep(wait)
1277 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001278
Antoine Pitroude911b22011-12-21 11:03:24 +01001279def mul(x, y):
1280 return x*y
1281
Benjamin Petersone711caf2008-06-11 16:44:04 +00001282class _TestPool(BaseTestCase):
1283
1284 def test_apply(self):
1285 papply = self.pool.apply
1286 self.assertEqual(papply(sqr, (5,)), sqr(5))
1287 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1288
1289 def test_map(self):
1290 pmap = self.pool.map
1291 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1292 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1293 list(map(sqr, list(range(100)))))
1294
Antoine Pitroude911b22011-12-21 11:03:24 +01001295 def test_starmap(self):
1296 psmap = self.pool.starmap
1297 tuples = list(zip(range(10), range(9,-1, -1)))
1298 self.assertEqual(psmap(mul, tuples),
1299 list(itertools.starmap(mul, tuples)))
1300 tuples = list(zip(range(100), range(99,-1, -1)))
1301 self.assertEqual(psmap(mul, tuples, chunksize=20),
1302 list(itertools.starmap(mul, tuples)))
1303
1304 def test_starmap_async(self):
1305 tuples = list(zip(range(100), range(99,-1, -1)))
1306 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1307 list(itertools.starmap(mul, tuples)))
1308
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001309 def test_map_chunksize(self):
1310 try:
1311 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1312 except multiprocessing.TimeoutError:
1313 self.fail("pool.map_async with chunksize stalled on null list")
1314
Benjamin Petersone711caf2008-06-11 16:44:04 +00001315 def test_async(self):
1316 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1317 get = TimingWrapper(res.get)
1318 self.assertEqual(get(), 49)
1319 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1320
1321 def test_async_timeout(self):
1322 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1323 get = TimingWrapper(res.get)
1324 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1325 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1326
1327 def test_imap(self):
1328 it = self.pool.imap(sqr, list(range(10)))
1329 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1330
1331 it = self.pool.imap(sqr, list(range(10)))
1332 for i in range(10):
1333 self.assertEqual(next(it), i*i)
1334 self.assertRaises(StopIteration, it.__next__)
1335
1336 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1337 for i in range(1000):
1338 self.assertEqual(next(it), i*i)
1339 self.assertRaises(StopIteration, it.__next__)
1340
1341 def test_imap_unordered(self):
1342 it = self.pool.imap_unordered(sqr, list(range(1000)))
1343 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1344
1345 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1346 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1347
1348 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001349 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1350 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1351
Benjamin Petersone711caf2008-06-11 16:44:04 +00001352 p = multiprocessing.Pool(3)
1353 self.assertEqual(3, len(p._pool))
1354 p.close()
1355 p.join()
1356
1357 def test_terminate(self):
1358 if self.TYPE == 'manager':
1359 # On Unix a forked process increfs each shared object to
1360 # which its parent process held a reference. If the
1361 # forked process gets terminated then there is likely to
1362 # be a reference leak. So to prevent
1363 # _TestZZZNumberOfObjects from failing we skip this test
1364 # when using a manager.
1365 return
1366
1367 result = self.pool.map_async(
1368 time.sleep, [0.1 for i in range(10000)], chunksize=1
1369 )
1370 self.pool.terminate()
1371 join = TimingWrapper(self.pool.join)
1372 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001373 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001374
Richard Oudkerke41682b2012-06-06 19:04:57 +01001375 def test_empty_iterable(self):
1376 # See Issue 12157
1377 p = self.Pool(1)
1378
1379 self.assertEqual(p.map(sqr, []), [])
1380 self.assertEqual(list(p.imap(sqr, [])), [])
1381 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1382 self.assertEqual(p.map_async(sqr, []).get(), [])
1383
1384 p.close()
1385 p.join()
1386
Ask Solem2afcbf22010-11-09 20:55:52 +00001387def raising():
1388 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001389
Ask Solem2afcbf22010-11-09 20:55:52 +00001390def unpickleable_result():
1391 return lambda: 42
1392
1393class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001394 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001395
1396 def test_async_error_callback(self):
1397 p = multiprocessing.Pool(2)
1398
1399 scratchpad = [None]
1400 def errback(exc):
1401 scratchpad[0] = exc
1402
1403 res = p.apply_async(raising, error_callback=errback)
1404 self.assertRaises(KeyError, res.get)
1405 self.assertTrue(scratchpad[0])
1406 self.assertIsInstance(scratchpad[0], KeyError)
1407
1408 p.close()
1409 p.join()
1410
1411 def test_unpickleable_result(self):
1412 from multiprocessing.pool import MaybeEncodingError
1413 p = multiprocessing.Pool(2)
1414
1415 # Make sure we don't lose pool processes because of encoding errors.
1416 for iteration in range(20):
1417
1418 scratchpad = [None]
1419 def errback(exc):
1420 scratchpad[0] = exc
1421
1422 res = p.apply_async(unpickleable_result, error_callback=errback)
1423 self.assertRaises(MaybeEncodingError, res.get)
1424 wrapped = scratchpad[0]
1425 self.assertTrue(wrapped)
1426 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1427 self.assertIsNotNone(wrapped.exc)
1428 self.assertIsNotNone(wrapped.value)
1429
1430 p.close()
1431 p.join()
1432
1433class _TestPoolWorkerLifetime(BaseTestCase):
1434 ALLOWED_TYPES = ('processes', )
1435
Jesse Noller1f0b6582010-01-27 03:36:01 +00001436 def test_pool_worker_lifetime(self):
1437 p = multiprocessing.Pool(3, maxtasksperchild=10)
1438 self.assertEqual(3, len(p._pool))
1439 origworkerpids = [w.pid for w in p._pool]
1440 # Run many tasks so each worker gets replaced (hopefully)
1441 results = []
1442 for i in range(100):
1443 results.append(p.apply_async(sqr, (i, )))
1444 # Fetch the results and verify we got the right answers,
1445 # also ensuring all the tasks have completed.
1446 for (j, res) in enumerate(results):
1447 self.assertEqual(res.get(), sqr(j))
1448 # Refill the pool
1449 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001450 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001451 # (countdown * DELTA = 5 seconds max startup process time)
1452 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001453 while countdown and not all(w.is_alive() for w in p._pool):
1454 countdown -= 1
1455 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001456 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001457 # All pids should be assigned. See issue #7805.
1458 self.assertNotIn(None, origworkerpids)
1459 self.assertNotIn(None, finalworkerpids)
1460 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001461 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1462 p.close()
1463 p.join()
1464
Charles-François Natalif8859e12011-10-24 18:45:29 +02001465 def test_pool_worker_lifetime_early_close(self):
1466 # Issue #10332: closing a pool whose workers have limited lifetimes
1467 # before all the tasks completed would make join() hang.
1468 p = multiprocessing.Pool(3, maxtasksperchild=1)
1469 results = []
1470 for i in range(6):
1471 results.append(p.apply_async(sqr, (i, 0.3)))
1472 p.close()
1473 p.join()
1474 # check the results
1475 for (j, res) in enumerate(results):
1476 self.assertEqual(res.get(), sqr(j))
1477
1478
Benjamin Petersone711caf2008-06-11 16:44:04 +00001479#
1480# Test that manager has expected number of shared objects left
1481#
1482
1483class _TestZZZNumberOfObjects(BaseTestCase):
1484 # Because test cases are sorted alphabetically, this one will get
1485 # run after all the other tests for the manager. It tests that
1486 # there have been no "reference leaks" for the manager's shared
1487 # objects. Note the comment in _TestPool.test_terminate().
1488 ALLOWED_TYPES = ('manager',)
1489
1490 def test_number_of_objects(self):
1491 EXPECTED_NUMBER = 1 # the pool object is still alive
1492 multiprocessing.active_children() # discard dead process objs
1493 gc.collect() # do garbage collection
1494 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001495 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001496 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001497 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001498 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001499
1500 self.assertEqual(refs, EXPECTED_NUMBER)
1501
1502#
1503# Test of creating a customized manager class
1504#
1505
1506from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1507
1508class FooBar(object):
1509 def f(self):
1510 return 'f()'
1511 def g(self):
1512 raise ValueError
1513 def _h(self):
1514 return '_h()'
1515
1516def baz():
1517 for i in range(10):
1518 yield i*i
1519
1520class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001521 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001522 def __iter__(self):
1523 return self
1524 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001525 return self._callmethod('__next__')
1526
1527class MyManager(BaseManager):
1528 pass
1529
1530MyManager.register('Foo', callable=FooBar)
1531MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1532MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1533
1534
1535class _TestMyManager(BaseTestCase):
1536
1537 ALLOWED_TYPES = ('manager',)
1538
1539 def test_mymanager(self):
1540 manager = MyManager()
1541 manager.start()
1542
1543 foo = manager.Foo()
1544 bar = manager.Bar()
1545 baz = manager.baz()
1546
1547 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1548 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1549
1550 self.assertEqual(foo_methods, ['f', 'g'])
1551 self.assertEqual(bar_methods, ['f', '_h'])
1552
1553 self.assertEqual(foo.f(), 'f()')
1554 self.assertRaises(ValueError, foo.g)
1555 self.assertEqual(foo._callmethod('f'), 'f()')
1556 self.assertRaises(RemoteError, foo._callmethod, '_h')
1557
1558 self.assertEqual(bar.f(), 'f()')
1559 self.assertEqual(bar._h(), '_h()')
1560 self.assertEqual(bar._callmethod('f'), 'f()')
1561 self.assertEqual(bar._callmethod('_h'), '_h()')
1562
1563 self.assertEqual(list(baz), [i*i for i in range(10)])
1564
1565 manager.shutdown()
1566
Richard Oudkerk73d9a292012-06-14 15:30:10 +01001567 # If the manager process exited cleanly then the exitcode
1568 # will be zero. Otherwise (after a short timeout)
1569 # terminate() is used, resulting in an exitcode of -SIGTERM.
1570 self.assertEqual(manager._process.exitcode, 0)
1571
Benjamin Petersone711caf2008-06-11 16:44:04 +00001572#
1573# Test of connecting to a remote server and using xmlrpclib for serialization
1574#
1575
1576_queue = pyqueue.Queue()
1577def get_queue():
1578 return _queue
1579
1580class QueueManager(BaseManager):
1581 '''manager class used by server process'''
1582QueueManager.register('get_queue', callable=get_queue)
1583
1584class QueueManager2(BaseManager):
1585 '''manager class which specifies the same interface as QueueManager'''
1586QueueManager2.register('get_queue')
1587
1588
1589SERIALIZER = 'xmlrpclib'
1590
1591class _TestRemoteManager(BaseTestCase):
1592
1593 ALLOWED_TYPES = ('manager',)
1594
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001595 @classmethod
1596 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001597 manager = QueueManager2(
1598 address=address, authkey=authkey, serializer=SERIALIZER
1599 )
1600 manager.connect()
1601 queue = manager.get_queue()
1602 queue.put(('hello world', None, True, 2.25))
1603
1604 def test_remote(self):
1605 authkey = os.urandom(32)
1606
1607 manager = QueueManager(
1608 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1609 )
1610 manager.start()
1611
1612 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001613 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001614 p.start()
1615
1616 manager2 = QueueManager2(
1617 address=manager.address, authkey=authkey, serializer=SERIALIZER
1618 )
1619 manager2.connect()
1620 queue = manager2.get_queue()
1621
1622 # Note that xmlrpclib will deserialize object as a list not a tuple
1623 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1624
1625 # Because we are using xmlrpclib for serialization instead of
1626 # pickle this will cause a serialization error.
1627 self.assertRaises(Exception, queue.put, time.sleep)
1628
1629 # Make queue finalizer run before the server is stopped
1630 del queue
1631 manager.shutdown()
1632
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001633class _TestManagerRestart(BaseTestCase):
1634
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001635 @classmethod
1636 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001637 manager = QueueManager(
1638 address=address, authkey=authkey, serializer=SERIALIZER)
1639 manager.connect()
1640 queue = manager.get_queue()
1641 queue.put('hello world')
1642
1643 def test_rapid_restart(self):
1644 authkey = os.urandom(32)
1645 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001646 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001647 srvr = manager.get_server()
1648 addr = srvr.address
1649 # Close the connection.Listener socket which gets opened as a part
1650 # of manager.get_server(). It's not needed for the test.
1651 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001652 manager.start()
1653
1654 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001655 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001656 p.start()
1657 queue = manager.get_queue()
1658 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001659 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001660 manager.shutdown()
1661 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001662 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001663 try:
1664 manager.start()
1665 except IOError as e:
1666 if e.errno != errno.EADDRINUSE:
1667 raise
1668 # Retry after some time, in case the old socket was lingering
1669 # (sporadic failure on buildbots)
1670 time.sleep(1.0)
1671 manager = QueueManager(
1672 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001673 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001674
Benjamin Petersone711caf2008-06-11 16:44:04 +00001675#
1676#
1677#
1678
1679SENTINEL = latin('')
1680
1681class _TestConnection(BaseTestCase):
1682
1683 ALLOWED_TYPES = ('processes', 'threads')
1684
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001685 @classmethod
1686 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001687 for msg in iter(conn.recv_bytes, SENTINEL):
1688 conn.send_bytes(msg)
1689 conn.close()
1690
1691 def test_connection(self):
1692 conn, child_conn = self.Pipe()
1693
1694 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001695 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001696 p.start()
1697
1698 seq = [1, 2.25, None]
1699 msg = latin('hello world')
1700 longmsg = msg * 10
1701 arr = array.array('i', list(range(4)))
1702
1703 if self.TYPE == 'processes':
1704 self.assertEqual(type(conn.fileno()), int)
1705
1706 self.assertEqual(conn.send(seq), None)
1707 self.assertEqual(conn.recv(), seq)
1708
1709 self.assertEqual(conn.send_bytes(msg), None)
1710 self.assertEqual(conn.recv_bytes(), msg)
1711
1712 if self.TYPE == 'processes':
1713 buffer = array.array('i', [0]*10)
1714 expected = list(arr) + [0] * (10 - len(arr))
1715 self.assertEqual(conn.send_bytes(arr), None)
1716 self.assertEqual(conn.recv_bytes_into(buffer),
1717 len(arr) * buffer.itemsize)
1718 self.assertEqual(list(buffer), expected)
1719
1720 buffer = array.array('i', [0]*10)
1721 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1722 self.assertEqual(conn.send_bytes(arr), None)
1723 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1724 len(arr) * buffer.itemsize)
1725 self.assertEqual(list(buffer), expected)
1726
1727 buffer = bytearray(latin(' ' * 40))
1728 self.assertEqual(conn.send_bytes(longmsg), None)
1729 try:
1730 res = conn.recv_bytes_into(buffer)
1731 except multiprocessing.BufferTooShort as e:
1732 self.assertEqual(e.args, (longmsg,))
1733 else:
1734 self.fail('expected BufferTooShort, got %s' % res)
1735
1736 poll = TimingWrapper(conn.poll)
1737
1738 self.assertEqual(poll(), False)
1739 self.assertTimingAlmostEqual(poll.elapsed, 0)
1740
Richard Oudkerk59d54042012-05-10 16:11:12 +01001741 self.assertEqual(poll(-1), False)
1742 self.assertTimingAlmostEqual(poll.elapsed, 0)
1743
Benjamin Petersone711caf2008-06-11 16:44:04 +00001744 self.assertEqual(poll(TIMEOUT1), False)
1745 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1746
1747 conn.send(None)
1748
1749 self.assertEqual(poll(TIMEOUT1), True)
1750 self.assertTimingAlmostEqual(poll.elapsed, 0)
1751
1752 self.assertEqual(conn.recv(), None)
1753
1754 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1755 conn.send_bytes(really_big_msg)
1756 self.assertEqual(conn.recv_bytes(), really_big_msg)
1757
1758 conn.send_bytes(SENTINEL) # tell child to quit
1759 child_conn.close()
1760
1761 if self.TYPE == 'processes':
1762 self.assertEqual(conn.readable, True)
1763 self.assertEqual(conn.writable, True)
1764 self.assertRaises(EOFError, conn.recv)
1765 self.assertRaises(EOFError, conn.recv_bytes)
1766
1767 p.join()
1768
1769 def test_duplex_false(self):
1770 reader, writer = self.Pipe(duplex=False)
1771 self.assertEqual(writer.send(1), None)
1772 self.assertEqual(reader.recv(), 1)
1773 if self.TYPE == 'processes':
1774 self.assertEqual(reader.readable, True)
1775 self.assertEqual(reader.writable, False)
1776 self.assertEqual(writer.readable, False)
1777 self.assertEqual(writer.writable, True)
1778 self.assertRaises(IOError, reader.send, 2)
1779 self.assertRaises(IOError, writer.recv)
1780 self.assertRaises(IOError, writer.poll)
1781
1782 def test_spawn_close(self):
1783 # We test that a pipe connection can be closed by parent
1784 # process immediately after child is spawned. On Windows this
1785 # would have sometimes failed on old versions because
1786 # child_conn would be closed before the child got a chance to
1787 # duplicate it.
1788 conn, child_conn = self.Pipe()
1789
1790 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001791 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001792 p.start()
1793 child_conn.close() # this might complete before child initializes
1794
1795 msg = latin('hello')
1796 conn.send_bytes(msg)
1797 self.assertEqual(conn.recv_bytes(), msg)
1798
1799 conn.send_bytes(SENTINEL)
1800 conn.close()
1801 p.join()
1802
1803 def test_sendbytes(self):
1804 if self.TYPE != 'processes':
1805 return
1806
1807 msg = latin('abcdefghijklmnopqrstuvwxyz')
1808 a, b = self.Pipe()
1809
1810 a.send_bytes(msg)
1811 self.assertEqual(b.recv_bytes(), msg)
1812
1813 a.send_bytes(msg, 5)
1814 self.assertEqual(b.recv_bytes(), msg[5:])
1815
1816 a.send_bytes(msg, 7, 8)
1817 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1818
1819 a.send_bytes(msg, 26)
1820 self.assertEqual(b.recv_bytes(), latin(''))
1821
1822 a.send_bytes(msg, 26, 0)
1823 self.assertEqual(b.recv_bytes(), latin(''))
1824
1825 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1826
1827 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1828
1829 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1830
1831 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1832
1833 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1834
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001835 @classmethod
1836 def _is_fd_assigned(cls, fd):
1837 try:
1838 os.fstat(fd)
1839 except OSError as e:
1840 if e.errno == errno.EBADF:
1841 return False
1842 raise
1843 else:
1844 return True
1845
1846 @classmethod
1847 def _writefd(cls, conn, data, create_dummy_fds=False):
1848 if create_dummy_fds:
1849 for i in range(0, 256):
1850 if not cls._is_fd_assigned(i):
1851 os.dup2(conn.fileno(), i)
1852 fd = reduction.recv_handle(conn)
1853 if msvcrt:
1854 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1855 os.write(fd, data)
1856 os.close(fd)
1857
Charles-François Natalibc8f0822011-09-20 20:36:51 +02001858 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001859 def test_fd_transfer(self):
1860 if self.TYPE != 'processes':
1861 self.skipTest("only makes sense with processes")
1862 conn, child_conn = self.Pipe(duplex=True)
1863
1864 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02001865 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001866 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001867 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001868 with open(test.support.TESTFN, "wb") as f:
1869 fd = f.fileno()
1870 if msvcrt:
1871 fd = msvcrt.get_osfhandle(fd)
1872 reduction.send_handle(conn, fd, p.pid)
1873 p.join()
1874 with open(test.support.TESTFN, "rb") as f:
1875 self.assertEqual(f.read(), b"foo")
1876
Charles-François Natalibc8f0822011-09-20 20:36:51 +02001877 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001878 @unittest.skipIf(sys.platform == "win32",
1879 "test semantics don't make sense on Windows")
1880 @unittest.skipIf(MAXFD <= 256,
1881 "largest assignable fd number is too small")
1882 @unittest.skipUnless(hasattr(os, "dup2"),
1883 "test needs os.dup2()")
1884 def test_large_fd_transfer(self):
1885 # With fd > 256 (issue #11657)
1886 if self.TYPE != 'processes':
1887 self.skipTest("only makes sense with processes")
1888 conn, child_conn = self.Pipe(duplex=True)
1889
1890 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02001891 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001892 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001893 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001894 with open(test.support.TESTFN, "wb") as f:
1895 fd = f.fileno()
1896 for newfd in range(256, MAXFD):
1897 if not self._is_fd_assigned(newfd):
1898 break
1899 else:
1900 self.fail("could not find an unassigned large file descriptor")
1901 os.dup2(fd, newfd)
1902 try:
1903 reduction.send_handle(conn, newfd, p.pid)
1904 finally:
1905 os.close(newfd)
1906 p.join()
1907 with open(test.support.TESTFN, "rb") as f:
1908 self.assertEqual(f.read(), b"bar")
1909
Jesus Cea4507e642011-09-21 03:53:25 +02001910 @classmethod
1911 def _send_data_without_fd(self, conn):
1912 os.write(conn.fileno(), b"\0")
1913
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001914 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02001915 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1916 def test_missing_fd_transfer(self):
1917 # Check that exception is raised when received data is not
1918 # accompanied by a file descriptor in ancillary data.
1919 if self.TYPE != 'processes':
1920 self.skipTest("only makes sense with processes")
1921 conn, child_conn = self.Pipe(duplex=True)
1922
1923 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1924 p.daemon = True
1925 p.start()
1926 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1927 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001928
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01001929class _TestListener(BaseTestCase):
1930
1931 ALLOWED_TYPES = ('processes')
1932
1933 def test_multiple_bind(self):
1934 for family in self.connection.families:
1935 l = self.connection.Listener(family=family)
1936 self.addCleanup(l.close)
1937 self.assertRaises(OSError, self.connection.Listener,
1938 l.address, family)
1939
Benjamin Petersone711caf2008-06-11 16:44:04 +00001940class _TestListenerClient(BaseTestCase):
1941
1942 ALLOWED_TYPES = ('processes', 'threads')
1943
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001944 @classmethod
1945 def _test(cls, address):
1946 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001947 conn.send('hello')
1948 conn.close()
1949
1950 def test_listener_client(self):
1951 for family in self.connection.families:
1952 l = self.connection.Listener(family=family)
1953 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001954 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001955 p.start()
1956 conn = l.accept()
1957 self.assertEqual(conn.recv(), 'hello')
1958 p.join()
1959 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01001960
Richard Oudkerkfdb8dcf2012-05-05 19:45:37 +01001961 def test_issue14725(self):
1962 l = self.connection.Listener()
1963 p = self.Process(target=self._test, args=(l.address,))
1964 p.daemon = True
1965 p.start()
1966 time.sleep(1)
1967 # On Windows the client process should by now have connected,
1968 # written data and closed the pipe handle by now. This causes
1969 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1970 # 14725.
1971 conn = l.accept()
1972 self.assertEqual(conn.recv(), 'hello')
1973 conn.close()
1974 p.join()
1975 l.close()
1976
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01001977class _TestPoll(unittest.TestCase):
1978
1979 ALLOWED_TYPES = ('processes', 'threads')
1980
1981 def test_empty_string(self):
1982 a, b = self.Pipe()
1983 self.assertEqual(a.poll(), False)
1984 b.send_bytes(b'')
1985 self.assertEqual(a.poll(), True)
1986 self.assertEqual(a.poll(), True)
1987
1988 @classmethod
1989 def _child_strings(cls, conn, strings):
1990 for s in strings:
1991 time.sleep(0.1)
1992 conn.send_bytes(s)
1993 conn.close()
1994
1995 def test_strings(self):
1996 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
1997 a, b = self.Pipe()
1998 p = self.Process(target=self._child_strings, args=(b, strings))
1999 p.start()
2000
2001 for s in strings:
2002 for i in range(200):
2003 if a.poll(0.01):
2004 break
2005 x = a.recv_bytes()
2006 self.assertEqual(s, x)
2007
2008 p.join()
2009
2010 @classmethod
2011 def _child_boundaries(cls, r):
2012 # Polling may "pull" a message in to the child process, but we
2013 # don't want it to pull only part of a message, as that would
2014 # corrupt the pipe for any other processes which might later
2015 # read from it.
2016 r.poll(5)
2017
2018 def test_boundaries(self):
2019 r, w = self.Pipe(False)
2020 p = self.Process(target=self._child_boundaries, args=(r,))
2021 p.start()
2022 time.sleep(2)
2023 L = [b"first", b"second"]
2024 for obj in L:
2025 w.send_bytes(obj)
2026 w.close()
2027 p.join()
2028 self.assertIn(r.recv_bytes(), L)
2029
2030 @classmethod
2031 def _child_dont_merge(cls, b):
2032 b.send_bytes(b'a')
2033 b.send_bytes(b'b')
2034 b.send_bytes(b'cd')
2035
2036 def test_dont_merge(self):
2037 a, b = self.Pipe()
2038 self.assertEqual(a.poll(0.0), False)
2039 self.assertEqual(a.poll(0.1), False)
2040
2041 p = self.Process(target=self._child_dont_merge, args=(b,))
2042 p.start()
2043
2044 self.assertEqual(a.recv_bytes(), b'a')
2045 self.assertEqual(a.poll(1.0), True)
2046 self.assertEqual(a.poll(1.0), True)
2047 self.assertEqual(a.recv_bytes(), b'b')
2048 self.assertEqual(a.poll(1.0), True)
2049 self.assertEqual(a.poll(1.0), True)
2050 self.assertEqual(a.poll(0.0), True)
2051 self.assertEqual(a.recv_bytes(), b'cd')
2052
2053 p.join()
2054
Benjamin Petersone711caf2008-06-11 16:44:04 +00002055#
2056# Test of sending connection and socket objects between processes
2057#
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002058
Richard Oudkerk24524192012-04-30 14:48:51 +01002059# Intermittent fails on Mac OS X -- see Issue14669 and Issue12958
2060@unittest.skipIf(sys.platform == "darwin", "fd passing unreliable on Mac OS X")
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002061@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002062class _TestPicklingConnections(BaseTestCase):
2063
2064 ALLOWED_TYPES = ('processes',)
2065
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002066 @classmethod
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02002067 def tearDownClass(cls):
2068 from multiprocessing.reduction import resource_sharer
2069 resource_sharer.stop(timeout=5)
2070
2071 @classmethod
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002072 def _listener(cls, conn, families):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002073 for fam in families:
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002074 l = cls.connection.Listener(family=fam)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002075 conn.send(l.address)
2076 new_conn = l.accept()
2077 conn.send(new_conn)
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002078 new_conn.close()
2079 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002080
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002081 l = socket.socket()
2082 l.bind(('localhost', 0))
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002083 l.listen(1)
Richard Oudkerk5d73c172012-05-08 22:24:47 +01002084 conn.send(l.getsockname())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002085 new_conn, addr = l.accept()
2086 conn.send(new_conn)
2087 new_conn.close()
2088 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002089
2090 conn.recv()
2091
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002092 @classmethod
2093 def _remote(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002094 for (address, msg) in iter(conn.recv, None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002095 client = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002096 client.send(msg.upper())
2097 client.close()
2098
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002099 address, msg = conn.recv()
2100 client = socket.socket()
2101 client.connect(address)
2102 client.sendall(msg.upper())
2103 client.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002104
2105 conn.close()
2106
2107 def test_pickling(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002108 families = self.connection.families
2109
2110 lconn, lconn0 = self.Pipe()
2111 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02002112 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002113 lp.start()
2114 lconn0.close()
2115
2116 rconn, rconn0 = self.Pipe()
2117 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002118 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002119 rp.start()
2120 rconn0.close()
2121
2122 for fam in families:
2123 msg = ('This connection uses family %s' % fam).encode('ascii')
2124 address = lconn.recv()
2125 rconn.send((address, msg))
2126 new_conn = lconn.recv()
2127 self.assertEqual(new_conn.recv(), msg.upper())
2128
2129 rconn.send(None)
2130
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002131 msg = latin('This connection uses a normal socket')
2132 address = lconn.recv()
2133 rconn.send((address, msg))
2134 new_conn = lconn.recv()
Richard Oudkerk4460c342012-04-30 14:48:50 +01002135 buf = []
2136 while True:
2137 s = new_conn.recv(100)
2138 if not s:
2139 break
2140 buf.append(s)
2141 buf = b''.join(buf)
2142 self.assertEqual(buf, msg.upper())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002143 new_conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002144
2145 lconn.send(None)
2146
2147 rconn.close()
2148 lconn.close()
2149
2150 lp.join()
2151 rp.join()
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002152
2153 @classmethod
2154 def child_access(cls, conn):
2155 w = conn.recv()
2156 w.send('all is well')
2157 w.close()
2158
2159 r = conn.recv()
2160 msg = r.recv()
2161 conn.send(msg*2)
2162
2163 conn.close()
2164
2165 def test_access(self):
2166 # On Windows, if we do not specify a destination pid when
2167 # using DupHandle then we need to be careful to use the
2168 # correct access flags for DuplicateHandle(), or else
2169 # DupHandle.detach() will raise PermissionError. For example,
2170 # for a read only pipe handle we should use
2171 # access=FILE_GENERIC_READ. (Unfortunately
2172 # DUPLICATE_SAME_ACCESS does not work.)
2173 conn, child_conn = self.Pipe()
2174 p = self.Process(target=self.child_access, args=(child_conn,))
2175 p.daemon = True
2176 p.start()
2177 child_conn.close()
2178
2179 r, w = self.Pipe(duplex=False)
2180 conn.send(w)
2181 w.close()
2182 self.assertEqual(r.recv(), 'all is well')
2183 r.close()
2184
2185 r, w = self.Pipe(duplex=False)
2186 conn.send(r)
2187 r.close()
2188 w.send('foobar')
2189 w.close()
2190 self.assertEqual(conn.recv(), 'foobar'*2)
2191
Benjamin Petersone711caf2008-06-11 16:44:04 +00002192#
2193#
2194#
2195
2196class _TestHeap(BaseTestCase):
2197
2198 ALLOWED_TYPES = ('processes',)
2199
2200 def test_heap(self):
2201 iterations = 5000
2202 maxblocks = 50
2203 blocks = []
2204
2205 # create and destroy lots of blocks of different sizes
2206 for i in range(iterations):
2207 size = int(random.lognormvariate(0, 1) * 1000)
2208 b = multiprocessing.heap.BufferWrapper(size)
2209 blocks.append(b)
2210 if len(blocks) > maxblocks:
2211 i = random.randrange(maxblocks)
2212 del blocks[i]
2213
2214 # get the heap object
2215 heap = multiprocessing.heap.BufferWrapper._heap
2216
2217 # verify the state of the heap
2218 all = []
2219 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02002220 heap._lock.acquire()
2221 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002222 for L in list(heap._len_to_seq.values()):
2223 for arena, start, stop in L:
2224 all.append((heap._arenas.index(arena), start, stop,
2225 stop-start, 'free'))
2226 for arena, start, stop in heap._allocated_blocks:
2227 all.append((heap._arenas.index(arena), start, stop,
2228 stop-start, 'occupied'))
2229 occupied += (stop-start)
2230
2231 all.sort()
2232
2233 for i in range(len(all)-1):
2234 (arena, start, stop) = all[i][:3]
2235 (narena, nstart, nstop) = all[i+1][:3]
2236 self.assertTrue((arena != narena and nstart == 0) or
2237 (stop == nstart))
2238
Charles-François Natali778db492011-07-02 14:35:49 +02002239 def test_free_from_gc(self):
2240 # Check that freeing of blocks by the garbage collector doesn't deadlock
2241 # (issue #12352).
2242 # Make sure the GC is enabled, and set lower collection thresholds to
2243 # make collections more frequent (and increase the probability of
2244 # deadlock).
2245 if not gc.isenabled():
2246 gc.enable()
2247 self.addCleanup(gc.disable)
2248 thresholds = gc.get_threshold()
2249 self.addCleanup(gc.set_threshold, *thresholds)
2250 gc.set_threshold(10)
2251
2252 # perform numerous block allocations, with cyclic references to make
2253 # sure objects are collected asynchronously by the gc
2254 for i in range(5000):
2255 a = multiprocessing.heap.BufferWrapper(1)
2256 b = multiprocessing.heap.BufferWrapper(1)
2257 # circular references
2258 a.buddy = b
2259 b.buddy = a
2260
Benjamin Petersone711caf2008-06-11 16:44:04 +00002261#
2262#
2263#
2264
Benjamin Petersone711caf2008-06-11 16:44:04 +00002265class _Foo(Structure):
2266 _fields_ = [
2267 ('x', c_int),
2268 ('y', c_double)
2269 ]
2270
2271class _TestSharedCTypes(BaseTestCase):
2272
2273 ALLOWED_TYPES = ('processes',)
2274
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00002275 def setUp(self):
2276 if not HAS_SHAREDCTYPES:
2277 self.skipTest("requires multiprocessing.sharedctypes")
2278
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002279 @classmethod
2280 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002281 x.value *= 2
2282 y.value *= 2
2283 foo.x *= 2
2284 foo.y *= 2
2285 string.value *= 2
2286 for i in range(len(arr)):
2287 arr[i] *= 2
2288
2289 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002290 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002291 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002292 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002293 arr = self.Array('d', list(range(10)), lock=lock)
2294 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00002295 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002296
2297 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02002298 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002299 p.start()
2300 p.join()
2301
2302 self.assertEqual(x.value, 14)
2303 self.assertAlmostEqual(y.value, 2.0/3.0)
2304 self.assertEqual(foo.x, 6)
2305 self.assertAlmostEqual(foo.y, 4.0)
2306 for i in range(10):
2307 self.assertAlmostEqual(arr[i], i*2)
2308 self.assertEqual(string.value, latin('hellohello'))
2309
2310 def test_synchronize(self):
2311 self.test_sharedctypes(lock=True)
2312
2313 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002314 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00002315 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002316 foo.x = 0
2317 foo.y = 0
2318 self.assertEqual(bar.x, 2)
2319 self.assertAlmostEqual(bar.y, 5.0)
2320
2321#
2322#
2323#
2324
2325class _TestFinalize(BaseTestCase):
2326
2327 ALLOWED_TYPES = ('processes',)
2328
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002329 @classmethod
2330 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002331 class Foo(object):
2332 pass
2333
2334 a = Foo()
2335 util.Finalize(a, conn.send, args=('a',))
2336 del a # triggers callback for a
2337
2338 b = Foo()
2339 close_b = util.Finalize(b, conn.send, args=('b',))
2340 close_b() # triggers callback for b
2341 close_b() # does nothing because callback has already been called
2342 del b # does nothing because callback has already been called
2343
2344 c = Foo()
2345 util.Finalize(c, conn.send, args=('c',))
2346
2347 d10 = Foo()
2348 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2349
2350 d01 = Foo()
2351 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2352 d02 = Foo()
2353 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2354 d03 = Foo()
2355 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2356
2357 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2358
2359 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2360
Ezio Melotti13925002011-03-16 11:05:33 +02002361 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002362 # garbage collecting locals
2363 util._exit_function()
2364 conn.close()
2365 os._exit(0)
2366
2367 def test_finalize(self):
2368 conn, child_conn = self.Pipe()
2369
2370 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002371 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002372 p.start()
2373 p.join()
2374
2375 result = [obj for obj in iter(conn.recv, 'STOP')]
2376 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2377
2378#
2379# Test that from ... import * works for each module
2380#
2381
2382class _TestImportStar(BaseTestCase):
2383
2384 ALLOWED_TYPES = ('processes',)
2385
2386 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002387 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002388 'multiprocessing', 'multiprocessing.connection',
2389 'multiprocessing.heap', 'multiprocessing.managers',
2390 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002391 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002392 ]
2393
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002394 if HAS_REDUCTION:
2395 modules.append('multiprocessing.reduction')
2396
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002397 if c_int is not None:
2398 # This module requires _ctypes
2399 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002400
2401 for name in modules:
2402 __import__(name)
2403 mod = sys.modules[name]
2404
2405 for attr in getattr(mod, '__all__', ()):
2406 self.assertTrue(
2407 hasattr(mod, attr),
2408 '%r does not have attribute %r' % (mod, attr)
2409 )
2410
2411#
2412# Quick test that logging works -- does not test logging output
2413#
2414
2415class _TestLogging(BaseTestCase):
2416
2417 ALLOWED_TYPES = ('processes',)
2418
2419 def test_enable_logging(self):
2420 logger = multiprocessing.get_logger()
2421 logger.setLevel(util.SUBWARNING)
2422 self.assertTrue(logger is not None)
2423 logger.debug('this will not be printed')
2424 logger.info('nor will this')
2425 logger.setLevel(LOG_LEVEL)
2426
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002427 @classmethod
2428 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002429 logger = multiprocessing.get_logger()
2430 conn.send(logger.getEffectiveLevel())
2431
2432 def test_level(self):
2433 LEVEL1 = 32
2434 LEVEL2 = 37
2435
2436 logger = multiprocessing.get_logger()
2437 root_logger = logging.getLogger()
2438 root_level = root_logger.level
2439
2440 reader, writer = multiprocessing.Pipe(duplex=False)
2441
2442 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002443 p = self.Process(target=self._test_level, args=(writer,))
2444 p.daemon = True
2445 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002446 self.assertEqual(LEVEL1, reader.recv())
2447
2448 logger.setLevel(logging.NOTSET)
2449 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002450 p = self.Process(target=self._test_level, args=(writer,))
2451 p.daemon = True
2452 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002453 self.assertEqual(LEVEL2, reader.recv())
2454
2455 root_logger.setLevel(root_level)
2456 logger.setLevel(level=LOG_LEVEL)
2457
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002458
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002459# class _TestLoggingProcessName(BaseTestCase):
2460#
2461# def handle(self, record):
2462# assert record.processName == multiprocessing.current_process().name
2463# self.__handled = True
2464#
2465# def test_logging(self):
2466# handler = logging.Handler()
2467# handler.handle = self.handle
2468# self.__handled = False
2469# # Bypass getLogger() and side-effects
2470# logger = logging.getLoggerClass()(
2471# 'multiprocessing.test.TestLoggingProcessName')
2472# logger.addHandler(handler)
2473# logger.propagate = False
2474#
2475# logger.warn('foo')
2476# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002477
Benjamin Petersone711caf2008-06-11 16:44:04 +00002478#
Jesse Noller6214edd2009-01-19 16:23:53 +00002479# Test to verify handle verification, see issue 3321
2480#
2481
2482class TestInvalidHandle(unittest.TestCase):
2483
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002484 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002485 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002486 conn = multiprocessing.connection.Connection(44977608)
2487 try:
2488 self.assertRaises((ValueError, IOError), conn.poll)
2489 finally:
2490 # Hack private attribute _handle to avoid printing an error
2491 # in conn.__del__
2492 conn._handle = None
2493 self.assertRaises((ValueError, IOError),
2494 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002495
Jesse Noller6214edd2009-01-19 16:23:53 +00002496#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002497# Functions used to create test cases from the base ones in this module
2498#
2499
2500def get_attributes(Source, names):
2501 d = {}
2502 for name in names:
2503 obj = getattr(Source, name)
2504 if type(obj) == type(get_attributes):
2505 obj = staticmethod(obj)
2506 d[name] = obj
2507 return d
2508
2509def create_test_cases(Mixin, type):
2510 result = {}
2511 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002512 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002513
2514 for name in list(glob.keys()):
2515 if name.startswith('_Test'):
2516 base = glob[name]
2517 if type in base.ALLOWED_TYPES:
2518 newname = 'With' + Type + name[1:]
2519 class Temp(base, unittest.TestCase, Mixin):
2520 pass
2521 result[newname] = Temp
2522 Temp.__name__ = newname
2523 Temp.__module__ = Mixin.__module__
2524 return result
2525
2526#
2527# Create test cases
2528#
2529
2530class ProcessesMixin(object):
2531 TYPE = 'processes'
2532 Process = multiprocessing.Process
2533 locals().update(get_attributes(multiprocessing, (
2534 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2535 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2536 'RawArray', 'current_process', 'active_children', 'Pipe',
Richard Oudkerke41682b2012-06-06 19:04:57 +01002537 'connection', 'JoinableQueue', 'Pool'
Benjamin Petersone711caf2008-06-11 16:44:04 +00002538 )))
2539
2540testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2541globals().update(testcases_processes)
2542
2543
2544class ManagerMixin(object):
2545 TYPE = 'manager'
2546 Process = multiprocessing.Process
2547 manager = object.__new__(multiprocessing.managers.SyncManager)
2548 locals().update(get_attributes(manager, (
2549 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2550 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
Richard Oudkerke41682b2012-06-06 19:04:57 +01002551 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersone711caf2008-06-11 16:44:04 +00002552 )))
2553
2554testcases_manager = create_test_cases(ManagerMixin, type='manager')
2555globals().update(testcases_manager)
2556
2557
2558class ThreadsMixin(object):
2559 TYPE = 'threads'
2560 Process = multiprocessing.dummy.Process
2561 locals().update(get_attributes(multiprocessing.dummy, (
2562 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2563 'Condition', 'Event', 'Value', 'Array', 'current_process',
2564 'active_children', 'Pipe', 'connection', 'dict', 'list',
Richard Oudkerke41682b2012-06-06 19:04:57 +01002565 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersone711caf2008-06-11 16:44:04 +00002566 )))
2567
2568testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2569globals().update(testcases_threads)
2570
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002571class OtherTest(unittest.TestCase):
2572 # TODO: add more tests for deliver/answer challenge.
2573 def test_deliver_challenge_auth_failure(self):
2574 class _FakeConnection(object):
2575 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002576 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002577 def send_bytes(self, data):
2578 pass
2579 self.assertRaises(multiprocessing.AuthenticationError,
2580 multiprocessing.connection.deliver_challenge,
2581 _FakeConnection(), b'abc')
2582
2583 def test_answer_challenge_auth_failure(self):
2584 class _FakeConnection(object):
2585 def __init__(self):
2586 self.count = 0
2587 def recv_bytes(self, size):
2588 self.count += 1
2589 if self.count == 1:
2590 return multiprocessing.connection.CHALLENGE
2591 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002592 return b'something bogus'
2593 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002594 def send_bytes(self, data):
2595 pass
2596 self.assertRaises(multiprocessing.AuthenticationError,
2597 multiprocessing.connection.answer_challenge,
2598 _FakeConnection(), b'abc')
2599
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002600#
2601# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2602#
2603
2604def initializer(ns):
2605 ns.test += 1
2606
2607class TestInitializers(unittest.TestCase):
2608 def setUp(self):
2609 self.mgr = multiprocessing.Manager()
2610 self.ns = self.mgr.Namespace()
2611 self.ns.test = 0
2612
2613 def tearDown(self):
2614 self.mgr.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01002615 self.mgr.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002616
2617 def test_manager_initializer(self):
2618 m = multiprocessing.managers.SyncManager()
2619 self.assertRaises(TypeError, m.start, 1)
2620 m.start(initializer, (self.ns,))
2621 self.assertEqual(self.ns.test, 1)
2622 m.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01002623 m.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002624
2625 def test_pool_initializer(self):
2626 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2627 p = multiprocessing.Pool(1, initializer, (self.ns,))
2628 p.close()
2629 p.join()
2630 self.assertEqual(self.ns.test, 1)
2631
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002632#
2633# Issue 5155, 5313, 5331: Test process in processes
2634# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2635#
2636
2637def _ThisSubProcess(q):
2638 try:
2639 item = q.get(block=False)
2640 except pyqueue.Empty:
2641 pass
2642
2643def _TestProcess(q):
2644 queue = multiprocessing.Queue()
2645 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002646 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002647 subProc.start()
2648 subProc.join()
2649
2650def _afunc(x):
2651 return x*x
2652
2653def pool_in_process():
2654 pool = multiprocessing.Pool(processes=4)
2655 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01002656 pool.close()
2657 pool.join()
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002658
2659class _file_like(object):
2660 def __init__(self, delegate):
2661 self._delegate = delegate
2662 self._pid = None
2663
2664 @property
2665 def cache(self):
2666 pid = os.getpid()
2667 # There are no race conditions since fork keeps only the running thread
2668 if pid != self._pid:
2669 self._pid = pid
2670 self._cache = []
2671 return self._cache
2672
2673 def write(self, data):
2674 self.cache.append(data)
2675
2676 def flush(self):
2677 self._delegate.write(''.join(self.cache))
2678 self._cache = []
2679
2680class TestStdinBadfiledescriptor(unittest.TestCase):
2681
2682 def test_queue_in_process(self):
2683 queue = multiprocessing.Queue()
2684 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2685 proc.start()
2686 proc.join()
2687
2688 def test_pool_in_process(self):
2689 p = multiprocessing.Process(target=pool_in_process)
2690 p.start()
2691 p.join()
2692
2693 def test_flushing(self):
2694 sio = io.StringIO()
2695 flike = _file_like(sio)
2696 flike.write('foo')
2697 proc = multiprocessing.Process(target=lambda: flike.flush())
2698 flike.flush()
2699 assert sio.getvalue() == 'foo'
2700
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002701
2702class TestWait(unittest.TestCase):
2703
2704 @classmethod
2705 def _child_test_wait(cls, w, slow):
2706 for i in range(10):
2707 if slow:
2708 time.sleep(random.random()*0.1)
2709 w.send((i, os.getpid()))
2710 w.close()
2711
2712 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002713 from multiprocessing.connection import wait
2714 readers = []
2715 procs = []
2716 messages = []
2717
2718 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01002719 r, w = multiprocessing.Pipe(duplex=False)
2720 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002721 p.daemon = True
2722 p.start()
2723 w.close()
2724 readers.append(r)
2725 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01002726 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002727
2728 while readers:
2729 for r in wait(readers):
2730 try:
2731 msg = r.recv()
2732 except EOFError:
2733 readers.remove(r)
2734 r.close()
2735 else:
2736 messages.append(msg)
2737
2738 messages.sort()
2739 expected = sorted((i, p.pid) for i in range(10) for p in procs)
2740 self.assertEqual(messages, expected)
2741
2742 @classmethod
2743 def _child_test_wait_socket(cls, address, slow):
2744 s = socket.socket()
2745 s.connect(address)
2746 for i in range(10):
2747 if slow:
2748 time.sleep(random.random()*0.1)
2749 s.sendall(('%s\n' % i).encode('ascii'))
2750 s.close()
2751
2752 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002753 from multiprocessing.connection import wait
2754 l = socket.socket()
2755 l.bind(('', 0))
2756 l.listen(4)
2757 addr = ('localhost', l.getsockname()[1])
2758 readers = []
2759 procs = []
2760 dic = {}
2761
2762 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01002763 p = multiprocessing.Process(target=self._child_test_wait_socket,
2764 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002765 p.daemon = True
2766 p.start()
2767 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01002768 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002769
2770 for i in range(4):
2771 r, _ = l.accept()
2772 readers.append(r)
2773 dic[r] = []
2774 l.close()
2775
2776 while readers:
2777 for r in wait(readers):
2778 msg = r.recv(32)
2779 if not msg:
2780 readers.remove(r)
2781 r.close()
2782 else:
2783 dic[r].append(msg)
2784
2785 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
2786 for v in dic.values():
2787 self.assertEqual(b''.join(v), expected)
2788
2789 def test_wait_slow(self):
2790 self.test_wait(True)
2791
2792 def test_wait_socket_slow(self):
Richard Oudkerk104b3f42012-05-08 16:08:07 +01002793 self.test_wait_socket(True)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002794
2795 def test_wait_timeout(self):
2796 from multiprocessing.connection import wait
2797
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002798 expected = 5
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002799 a, b = multiprocessing.Pipe()
2800
2801 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002802 res = wait([a, b], expected)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002803 delta = time.time() - start
2804
2805 self.assertEqual(res, [])
Richard Oudkerk6dbca362012-05-06 16:46:36 +01002806 self.assertLess(delta, expected * 2)
2807 self.assertGreater(delta, expected * 0.5)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002808
2809 b.send(None)
2810
2811 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002812 res = wait([a, b], 20)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002813 delta = time.time() - start
2814
2815 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01002816 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002817
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002818 @classmethod
2819 def signal_and_sleep(cls, sem, period):
2820 sem.release()
2821 time.sleep(period)
2822
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002823 def test_wait_integer(self):
2824 from multiprocessing.connection import wait
2825
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002826 expected = 3
2827 sem = multiprocessing.Semaphore(0)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002828 a, b = multiprocessing.Pipe()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002829 p = multiprocessing.Process(target=self.signal_and_sleep,
2830 args=(sem, expected))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002831
2832 p.start()
2833 self.assertIsInstance(p.sentinel, int)
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002834 self.assertTrue(sem.acquire(timeout=20))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002835
2836 start = time.time()
2837 res = wait([a, p.sentinel, b], expected + 20)
2838 delta = time.time() - start
2839
2840 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01002841 self.assertLess(delta, expected + 2)
2842 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002843
2844 a.send(None)
2845
2846 start = time.time()
2847 res = wait([a, p.sentinel, b], 20)
2848 delta = time.time() - start
2849
2850 self.assertEqual(res, [p.sentinel, b])
Antoine Pitrou37749772012-03-09 18:40:15 +01002851 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002852
2853 b.send(None)
2854
2855 start = time.time()
2856 res = wait([a, p.sentinel, b], 20)
2857 delta = time.time() - start
2858
2859 self.assertEqual(res, [a, p.sentinel, b])
Antoine Pitrou37749772012-03-09 18:40:15 +01002860 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002861
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002862 p.terminate()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002863 p.join()
2864
Richard Oudkerk59d54042012-05-10 16:11:12 +01002865 def test_neg_timeout(self):
2866 from multiprocessing.connection import wait
2867 a, b = multiprocessing.Pipe()
2868 t = time.time()
2869 res = wait([a], timeout=-1)
2870 t = time.time() - t
2871 self.assertEqual(res, [])
2872 self.assertLess(t, 1)
2873 a.close()
2874 b.close()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002875
Antoine Pitrou709176f2012-04-01 17:19:09 +02002876#
2877# Issue 14151: Test invalid family on invalid environment
2878#
2879
2880class TestInvalidFamily(unittest.TestCase):
2881
2882 @unittest.skipIf(WIN32, "skipped on Windows")
2883 def test_invalid_family(self):
2884 with self.assertRaises(ValueError):
2885 multiprocessing.connection.Listener(r'\\.\test')
2886
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02002887 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
2888 def test_invalid_family_win32(self):
2889 with self.assertRaises(ValueError):
2890 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02002891
Richard Oudkerk77c84f22012-05-18 14:28:02 +01002892#
2893# Issue 12098: check sys.flags of child matches that for parent
2894#
2895
2896class TestFlags(unittest.TestCase):
2897 @classmethod
2898 def run_in_grandchild(cls, conn):
2899 conn.send(tuple(sys.flags))
2900
2901 @classmethod
2902 def run_in_child(cls):
2903 import json
2904 r, w = multiprocessing.Pipe(duplex=False)
2905 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
2906 p.start()
2907 grandchild_flags = r.recv()
2908 p.join()
2909 r.close()
2910 w.close()
2911 flags = (tuple(sys.flags), grandchild_flags)
2912 print(json.dumps(flags))
2913
2914 def test_flags(self):
2915 import json, subprocess
2916 # start child process using unusual flags
2917 prog = ('from test.test_multiprocessing import TestFlags; ' +
2918 'TestFlags.run_in_child()')
2919 data = subprocess.check_output(
2920 [sys.executable, '-E', '-S', '-O', '-c', prog])
2921 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
2922 self.assertEqual(child_flags, grandchild_flags)
2923
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002924testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Richard Oudkerk77c84f22012-05-18 14:28:02 +01002925 TestStdinBadfiledescriptor, TestWait, TestInvalidFamily,
2926 TestFlags]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002927
Benjamin Petersone711caf2008-06-11 16:44:04 +00002928#
2929#
2930#
2931
2932def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002933 if sys.platform.startswith("linux"):
2934 try:
2935 lock = multiprocessing.RLock()
2936 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002937 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002938
Charles-François Natali221ef672011-11-22 18:55:22 +01002939 check_enough_semaphores()
2940
Benjamin Petersone711caf2008-06-11 16:44:04 +00002941 if run is None:
2942 from test.support import run_unittest as run
2943
2944 util.get_temp_dir() # creates temp directory for use by all processes
2945
2946 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2947
Benjamin Peterson41181742008-07-02 20:22:54 +00002948 ProcessesMixin.pool = multiprocessing.Pool(4)
2949 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2950 ManagerMixin.manager.__init__()
2951 ManagerMixin.manager.start()
2952 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002953
2954 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002955 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2956 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002957 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2958 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002959 )
2960
2961 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2962 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01002963 try:
2964 run(suite)
2965 finally:
2966 ThreadsMixin.pool.terminate()
2967 ProcessesMixin.pool.terminate()
2968 ManagerMixin.pool.terminate()
2969 ManagerMixin.pool.join()
2970 ManagerMixin.manager.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01002971 ManagerMixin.manager.join()
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01002972 ThreadsMixin.pool.join()
2973 ProcessesMixin.pool.join()
2974 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002975
2976def main():
2977 test_main(unittest.TextTestRunner(verbosity=2).run)
2978
2979if __name__ == '__main__':
2980 main()