blob: 65e7b0be6ebea73c8cb5ab23681bd88f9b330690 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Antoine Pitroude911b22011-12-21 11:03:24 +010011import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020015import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import signal
17import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import socket
19import random
20import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000021import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000022
Benjamin Petersone5384b02008-10-04 22:00:42 +000023
R. David Murraya21e4ca2009-03-31 23:16:50 +000024# Skip tests if _multiprocessing wasn't built.
25_multiprocessing = test.support.import_module('_multiprocessing')
26# Skip tests if sem_open implementation is broken.
27test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000028# import threading after _multiprocessing to raise a more revelant error
29# message: "No module named _multiprocessing". _multiprocessing is not compiled
30# without thread support.
31import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000032
Benjamin Petersone711caf2008-06-11 16:44:04 +000033import multiprocessing.dummy
34import multiprocessing.connection
35import multiprocessing.managers
36import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000037import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000038
Charles-François Natalibc8f0822011-09-20 20:36:51 +020039from multiprocessing import util
40
41try:
42 from multiprocessing import reduction
43 HAS_REDUCTION = True
44except ImportError:
45 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000046
Brian Curtinafa88b52010-10-07 01:12:19 +000047try:
48 from multiprocessing.sharedctypes import Value, copy
49 HAS_SHAREDCTYPES = True
50except ImportError:
51 HAS_SHAREDCTYPES = False
52
Antoine Pitroubcb39d42011-08-23 19:46:22 +020053try:
54 import msvcrt
55except ImportError:
56 msvcrt = None
57
Benjamin Petersone711caf2008-06-11 16:44:04 +000058#
59#
60#
61
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000062def latin(s):
63 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000064
Benjamin Petersone711caf2008-06-11 16:44:04 +000065#
66# Constants
67#
68
69LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000070#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000071
72DELTA = 0.1
73CHECK_TIMINGS = False # making true makes tests take a lot longer
74 # and can sometimes cause some non-serious
75 # failures because some calls block a bit
76 # longer than expected
77if CHECK_TIMINGS:
78 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
79else:
80 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
81
82HAVE_GETVALUE = not getattr(_multiprocessing,
83 'HAVE_BROKEN_SEM_GETVALUE', False)
84
Jesse Noller6214edd2009-01-19 16:23:53 +000085WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020086
Richard Oudkerk59d54042012-05-10 16:11:12 +010087from multiprocessing.connection import wait
Antoine Pitrou176f07d2011-06-06 19:35:31 +020088
Richard Oudkerk59d54042012-05-10 16:11:12 +010089def wait_for_handle(handle, timeout):
90 if timeout is not None and timeout < 0.0:
91 timeout = None
92 return wait([handle], timeout)
Jesse Noller6214edd2009-01-19 16:23:53 +000093
Antoine Pitroubcb39d42011-08-23 19:46:22 +020094try:
95 MAXFD = os.sysconf("SC_OPEN_MAX")
96except:
97 MAXFD = 256
98
Benjamin Petersone711caf2008-06-11 16:44:04 +000099#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000100# Some tests require ctypes
101#
102
103try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000104 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000105except ImportError:
106 Structure = object
107 c_int = c_double = None
108
Charles-François Natali221ef672011-11-22 18:55:22 +0100109
110def check_enough_semaphores():
111 """Check that the system supports enough semaphores to run the test."""
112 # minimum number of semaphores available according to POSIX
113 nsems_min = 256
114 try:
115 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
116 except (AttributeError, ValueError):
117 # sysconf not available or setting not available
118 return
119 if nsems == -1 or nsems >= nsems_min:
120 return
121 raise unittest.SkipTest("The OS doesn't support enough semaphores "
122 "to run the test (required: %d)." % nsems_min)
123
124
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000125#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000126# Creates a wrapper for a function which records the time it takes to finish
127#
128
129class TimingWrapper(object):
130
131 def __init__(self, func):
132 self.func = func
133 self.elapsed = None
134
135 def __call__(self, *args, **kwds):
136 t = time.time()
137 try:
138 return self.func(*args, **kwds)
139 finally:
140 self.elapsed = time.time() - t
141
142#
143# Base class for test cases
144#
145
146class BaseTestCase(object):
147
148 ALLOWED_TYPES = ('processes', 'manager', 'threads')
149
150 def assertTimingAlmostEqual(self, a, b):
151 if CHECK_TIMINGS:
152 self.assertAlmostEqual(a, b, 1)
153
154 def assertReturnsIfImplemented(self, value, func, *args):
155 try:
156 res = func(*args)
157 except NotImplementedError:
158 pass
159 else:
160 return self.assertEqual(value, res)
161
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000162 # For the sanity of Windows users, rather than crashing or freezing in
163 # multiple ways.
164 def __reduce__(self, *args):
165 raise NotImplementedError("shouldn't try to pickle a test case")
166
167 __reduce_ex__ = __reduce__
168
Benjamin Petersone711caf2008-06-11 16:44:04 +0000169#
170# Return the value of a semaphore
171#
172
173def get_value(self):
174 try:
175 return self.get_value()
176 except AttributeError:
177 try:
178 return self._Semaphore__value
179 except AttributeError:
180 try:
181 return self._value
182 except AttributeError:
183 raise NotImplementedError
184
185#
186# Testcases
187#
188
189class _TestProcess(BaseTestCase):
190
191 ALLOWED_TYPES = ('processes', 'threads')
192
193 def test_current(self):
194 if self.TYPE == 'threads':
195 return
196
197 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000198 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000199
200 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000201 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000202 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000203 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000204 self.assertEqual(current.ident, os.getpid())
205 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000207 def test_daemon_argument(self):
208 if self.TYPE == "threads":
209 return
210
211 # By default uses the current process's daemon flag.
212 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000213 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000214 proc1 = self.Process(target=self._test, daemon=True)
215 self.assertTrue(proc1.daemon)
216 proc2 = self.Process(target=self._test, daemon=False)
217 self.assertFalse(proc2.daemon)
218
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000219 @classmethod
220 def _test(cls, q, *args, **kwds):
221 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000222 q.put(args)
223 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000224 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000225 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000226 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000227 q.put(current.pid)
228
229 def test_process(self):
230 q = self.Queue(1)
231 e = self.Event()
232 args = (q, 1, 2)
233 kwargs = {'hello':23, 'bye':2.54}
234 name = 'SomeProcess'
235 p = self.Process(
236 target=self._test, args=args, kwargs=kwargs, name=name
237 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000238 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000239 current = self.current_process()
240
241 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000242 self.assertEqual(p.authkey, current.authkey)
243 self.assertEqual(p.is_alive(), False)
244 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000245 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000246 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000247 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000248
249 p.start()
250
Ezio Melottib3aedd42010-11-20 19:04:17 +0000251 self.assertEqual(p.exitcode, None)
252 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000253 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000254
Ezio Melottib3aedd42010-11-20 19:04:17 +0000255 self.assertEqual(q.get(), args[1:])
256 self.assertEqual(q.get(), kwargs)
257 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000258 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000259 self.assertEqual(q.get(), current.authkey)
260 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000261
262 p.join()
263
Ezio Melottib3aedd42010-11-20 19:04:17 +0000264 self.assertEqual(p.exitcode, 0)
265 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000266 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000267
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000268 @classmethod
269 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000270 time.sleep(1000)
271
272 def test_terminate(self):
273 if self.TYPE == 'threads':
274 return
275
276 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000277 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000278 p.start()
279
280 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000281 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000282 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000283
Richard Oudkerk59d54042012-05-10 16:11:12 +0100284 join = TimingWrapper(p.join)
285
286 self.assertEqual(join(0), None)
287 self.assertTimingAlmostEqual(join.elapsed, 0.0)
288 self.assertEqual(p.is_alive(), True)
289
290 self.assertEqual(join(-1), None)
291 self.assertTimingAlmostEqual(join.elapsed, 0.0)
292 self.assertEqual(p.is_alive(), True)
293
Benjamin Petersone711caf2008-06-11 16:44:04 +0000294 p.terminate()
295
Benjamin Petersone711caf2008-06-11 16:44:04 +0000296 self.assertEqual(join(), None)
297 self.assertTimingAlmostEqual(join.elapsed, 0.0)
298
299 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000300 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000301
302 p.join()
303
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000304 # XXX sometimes get p.exitcode == 0 on Windows ...
305 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000306
307 def test_cpu_count(self):
308 try:
309 cpus = multiprocessing.cpu_count()
310 except NotImplementedError:
311 cpus = 1
312 self.assertTrue(type(cpus) is int)
313 self.assertTrue(cpus >= 1)
314
315 def test_active_children(self):
316 self.assertEqual(type(self.active_children()), list)
317
318 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000319 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000320
Jesus Cea94f964f2011-09-09 20:26:57 +0200321 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000322 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000323 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000324
325 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000326 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000327
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000328 @classmethod
329 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000330 from multiprocessing import forking
331 wconn.send(id)
332 if len(id) < 2:
333 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000334 p = cls.Process(
335 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000336 )
337 p.start()
338 p.join()
339
340 def test_recursion(self):
341 rconn, wconn = self.Pipe(duplex=False)
342 self._test_recursion(wconn, [])
343
344 time.sleep(DELTA)
345 result = []
346 while rconn.poll():
347 result.append(rconn.recv())
348
349 expected = [
350 [],
351 [0],
352 [0, 0],
353 [0, 1],
354 [1],
355 [1, 0],
356 [1, 1]
357 ]
358 self.assertEqual(result, expected)
359
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200360 @classmethod
361 def _test_sentinel(cls, event):
362 event.wait(10.0)
363
364 def test_sentinel(self):
365 if self.TYPE == "threads":
366 return
367 event = self.Event()
368 p = self.Process(target=self._test_sentinel, args=(event,))
369 with self.assertRaises(ValueError):
370 p.sentinel
371 p.start()
372 self.addCleanup(p.join)
373 sentinel = p.sentinel
374 self.assertIsInstance(sentinel, int)
375 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
376 event.set()
377 p.join()
378 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
379
Benjamin Petersone711caf2008-06-11 16:44:04 +0000380#
381#
382#
383
384class _UpperCaser(multiprocessing.Process):
385
386 def __init__(self):
387 multiprocessing.Process.__init__(self)
388 self.child_conn, self.parent_conn = multiprocessing.Pipe()
389
390 def run(self):
391 self.parent_conn.close()
392 for s in iter(self.child_conn.recv, None):
393 self.child_conn.send(s.upper())
394 self.child_conn.close()
395
396 def submit(self, s):
397 assert type(s) is str
398 self.parent_conn.send(s)
399 return self.parent_conn.recv()
400
401 def stop(self):
402 self.parent_conn.send(None)
403 self.parent_conn.close()
404 self.child_conn.close()
405
406class _TestSubclassingProcess(BaseTestCase):
407
408 ALLOWED_TYPES = ('processes',)
409
410 def test_subclassing(self):
411 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200412 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000413 uppercaser.start()
414 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
415 self.assertEqual(uppercaser.submit('world'), 'WORLD')
416 uppercaser.stop()
417 uppercaser.join()
418
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100419 def test_stderr_flush(self):
420 # sys.stderr is flushed at process shutdown (issue #13812)
421 if self.TYPE == "threads":
422 return
423
424 testfn = test.support.TESTFN
425 self.addCleanup(test.support.unlink, testfn)
426 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
427 proc.start()
428 proc.join()
429 with open(testfn, 'r') as f:
430 err = f.read()
431 # The whole traceback was printed
432 self.assertIn("ZeroDivisionError", err)
433 self.assertIn("test_multiprocessing.py", err)
434 self.assertIn("1/0 # MARKER", err)
435
436 @classmethod
437 def _test_stderr_flush(cls, testfn):
438 sys.stderr = open(testfn, 'w')
439 1/0 # MARKER
440
441
Richard Oudkerk29471de2012-06-06 19:04:57 +0100442 @classmethod
443 def _test_sys_exit(cls, reason, testfn):
444 sys.stderr = open(testfn, 'w')
445 sys.exit(reason)
446
447 def test_sys_exit(self):
448 # See Issue 13854
449 if self.TYPE == 'threads':
450 return
451
452 testfn = test.support.TESTFN
453 self.addCleanup(test.support.unlink, testfn)
454
455 for reason, code in (([1, 2, 3], 1), ('ignore this', 0)):
456 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
457 p.daemon = True
458 p.start()
459 p.join(5)
460 self.assertEqual(p.exitcode, code)
461
462 with open(testfn, 'r') as f:
463 self.assertEqual(f.read().rstrip(), str(reason))
464
465 for reason in (True, False, 8):
466 p = self.Process(target=sys.exit, args=(reason,))
467 p.daemon = True
468 p.start()
469 p.join(5)
470 self.assertEqual(p.exitcode, reason)
471
Benjamin Petersone711caf2008-06-11 16:44:04 +0000472#
473#
474#
475
476def queue_empty(q):
477 if hasattr(q, 'empty'):
478 return q.empty()
479 else:
480 return q.qsize() == 0
481
482def queue_full(q, maxsize):
483 if hasattr(q, 'full'):
484 return q.full()
485 else:
486 return q.qsize() == maxsize
487
488
489class _TestQueue(BaseTestCase):
490
491
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000492 @classmethod
493 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000494 child_can_start.wait()
495 for i in range(6):
496 queue.get()
497 parent_can_continue.set()
498
499 def test_put(self):
500 MAXSIZE = 6
501 queue = self.Queue(maxsize=MAXSIZE)
502 child_can_start = self.Event()
503 parent_can_continue = self.Event()
504
505 proc = self.Process(
506 target=self._test_put,
507 args=(queue, child_can_start, parent_can_continue)
508 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000509 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000510 proc.start()
511
512 self.assertEqual(queue_empty(queue), True)
513 self.assertEqual(queue_full(queue, MAXSIZE), False)
514
515 queue.put(1)
516 queue.put(2, True)
517 queue.put(3, True, None)
518 queue.put(4, False)
519 queue.put(5, False, None)
520 queue.put_nowait(6)
521
522 # the values may be in buffer but not yet in pipe so sleep a bit
523 time.sleep(DELTA)
524
525 self.assertEqual(queue_empty(queue), False)
526 self.assertEqual(queue_full(queue, MAXSIZE), True)
527
528 put = TimingWrapper(queue.put)
529 put_nowait = TimingWrapper(queue.put_nowait)
530
531 self.assertRaises(pyqueue.Full, put, 7, False)
532 self.assertTimingAlmostEqual(put.elapsed, 0)
533
534 self.assertRaises(pyqueue.Full, put, 7, False, None)
535 self.assertTimingAlmostEqual(put.elapsed, 0)
536
537 self.assertRaises(pyqueue.Full, put_nowait, 7)
538 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
539
540 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
541 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
542
543 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
544 self.assertTimingAlmostEqual(put.elapsed, 0)
545
546 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
547 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
548
549 child_can_start.set()
550 parent_can_continue.wait()
551
552 self.assertEqual(queue_empty(queue), True)
553 self.assertEqual(queue_full(queue, MAXSIZE), False)
554
555 proc.join()
556
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000557 @classmethod
558 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000559 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000560 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000561 queue.put(2)
562 queue.put(3)
563 queue.put(4)
564 queue.put(5)
565 parent_can_continue.set()
566
567 def test_get(self):
568 queue = self.Queue()
569 child_can_start = self.Event()
570 parent_can_continue = self.Event()
571
572 proc = self.Process(
573 target=self._test_get,
574 args=(queue, child_can_start, parent_can_continue)
575 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000576 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000577 proc.start()
578
579 self.assertEqual(queue_empty(queue), True)
580
581 child_can_start.set()
582 parent_can_continue.wait()
583
584 time.sleep(DELTA)
585 self.assertEqual(queue_empty(queue), False)
586
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000587 # Hangs unexpectedly, remove for now
588 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000589 self.assertEqual(queue.get(True, None), 2)
590 self.assertEqual(queue.get(True), 3)
591 self.assertEqual(queue.get(timeout=1), 4)
592 self.assertEqual(queue.get_nowait(), 5)
593
594 self.assertEqual(queue_empty(queue), True)
595
596 get = TimingWrapper(queue.get)
597 get_nowait = TimingWrapper(queue.get_nowait)
598
599 self.assertRaises(pyqueue.Empty, get, False)
600 self.assertTimingAlmostEqual(get.elapsed, 0)
601
602 self.assertRaises(pyqueue.Empty, get, False, None)
603 self.assertTimingAlmostEqual(get.elapsed, 0)
604
605 self.assertRaises(pyqueue.Empty, get_nowait)
606 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
607
608 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
609 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
610
611 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
612 self.assertTimingAlmostEqual(get.elapsed, 0)
613
614 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
615 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
616
617 proc.join()
618
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000619 @classmethod
620 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000621 for i in range(10, 20):
622 queue.put(i)
623 # note that at this point the items may only be buffered, so the
624 # process cannot shutdown until the feeder thread has finished
625 # pushing items onto the pipe.
626
627 def test_fork(self):
628 # Old versions of Queue would fail to create a new feeder
629 # thread for a forked process if the original process had its
630 # own feeder thread. This test checks that this no longer
631 # happens.
632
633 queue = self.Queue()
634
635 # put items on queue so that main process starts a feeder thread
636 for i in range(10):
637 queue.put(i)
638
639 # wait to make sure thread starts before we fork a new process
640 time.sleep(DELTA)
641
642 # fork process
643 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200644 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000645 p.start()
646
647 # check that all expected items are in the queue
648 for i in range(20):
649 self.assertEqual(queue.get(), i)
650 self.assertRaises(pyqueue.Empty, queue.get, False)
651
652 p.join()
653
654 def test_qsize(self):
655 q = self.Queue()
656 try:
657 self.assertEqual(q.qsize(), 0)
658 except NotImplementedError:
659 return
660 q.put(1)
661 self.assertEqual(q.qsize(), 1)
662 q.put(5)
663 self.assertEqual(q.qsize(), 2)
664 q.get()
665 self.assertEqual(q.qsize(), 1)
666 q.get()
667 self.assertEqual(q.qsize(), 0)
668
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000669 @classmethod
670 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000671 for obj in iter(q.get, None):
672 time.sleep(DELTA)
673 q.task_done()
674
675 def test_task_done(self):
676 queue = self.JoinableQueue()
677
678 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000679 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000680
681 workers = [self.Process(target=self._test_task_done, args=(queue,))
682 for i in range(4)]
683
684 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200685 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000686 p.start()
687
688 for i in range(10):
689 queue.put(i)
690
691 queue.join()
692
693 for p in workers:
694 queue.put(None)
695
696 for p in workers:
697 p.join()
698
699#
700#
701#
702
703class _TestLock(BaseTestCase):
704
705 def test_lock(self):
706 lock = self.Lock()
707 self.assertEqual(lock.acquire(), True)
708 self.assertEqual(lock.acquire(False), False)
709 self.assertEqual(lock.release(), None)
710 self.assertRaises((ValueError, threading.ThreadError), lock.release)
711
712 def test_rlock(self):
713 lock = self.RLock()
714 self.assertEqual(lock.acquire(), True)
715 self.assertEqual(lock.acquire(), True)
716 self.assertEqual(lock.acquire(), True)
717 self.assertEqual(lock.release(), None)
718 self.assertEqual(lock.release(), None)
719 self.assertEqual(lock.release(), None)
720 self.assertRaises((AssertionError, RuntimeError), lock.release)
721
Jesse Nollerf8d00852009-03-31 03:25:07 +0000722 def test_lock_context(self):
723 with self.Lock():
724 pass
725
Benjamin Petersone711caf2008-06-11 16:44:04 +0000726
727class _TestSemaphore(BaseTestCase):
728
729 def _test_semaphore(self, sem):
730 self.assertReturnsIfImplemented(2, get_value, sem)
731 self.assertEqual(sem.acquire(), True)
732 self.assertReturnsIfImplemented(1, get_value, sem)
733 self.assertEqual(sem.acquire(), True)
734 self.assertReturnsIfImplemented(0, get_value, sem)
735 self.assertEqual(sem.acquire(False), False)
736 self.assertReturnsIfImplemented(0, get_value, sem)
737 self.assertEqual(sem.release(), None)
738 self.assertReturnsIfImplemented(1, get_value, sem)
739 self.assertEqual(sem.release(), None)
740 self.assertReturnsIfImplemented(2, get_value, sem)
741
742 def test_semaphore(self):
743 sem = self.Semaphore(2)
744 self._test_semaphore(sem)
745 self.assertEqual(sem.release(), None)
746 self.assertReturnsIfImplemented(3, get_value, sem)
747 self.assertEqual(sem.release(), None)
748 self.assertReturnsIfImplemented(4, get_value, sem)
749
750 def test_bounded_semaphore(self):
751 sem = self.BoundedSemaphore(2)
752 self._test_semaphore(sem)
753 # Currently fails on OS/X
754 #if HAVE_GETVALUE:
755 # self.assertRaises(ValueError, sem.release)
756 # self.assertReturnsIfImplemented(2, get_value, sem)
757
758 def test_timeout(self):
759 if self.TYPE != 'processes':
760 return
761
762 sem = self.Semaphore(0)
763 acquire = TimingWrapper(sem.acquire)
764
765 self.assertEqual(acquire(False), False)
766 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
767
768 self.assertEqual(acquire(False, None), False)
769 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
770
771 self.assertEqual(acquire(False, TIMEOUT1), False)
772 self.assertTimingAlmostEqual(acquire.elapsed, 0)
773
774 self.assertEqual(acquire(True, TIMEOUT2), False)
775 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
776
777 self.assertEqual(acquire(timeout=TIMEOUT3), False)
778 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
779
780
781class _TestCondition(BaseTestCase):
782
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000783 @classmethod
784 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000785 cond.acquire()
786 sleeping.release()
787 cond.wait(timeout)
788 woken.release()
789 cond.release()
790
791 def check_invariant(self, cond):
792 # this is only supposed to succeed when there are no sleepers
793 if self.TYPE == 'processes':
794 try:
795 sleepers = (cond._sleeping_count.get_value() -
796 cond._woken_count.get_value())
797 self.assertEqual(sleepers, 0)
798 self.assertEqual(cond._wait_semaphore.get_value(), 0)
799 except NotImplementedError:
800 pass
801
802 def test_notify(self):
803 cond = self.Condition()
804 sleeping = self.Semaphore(0)
805 woken = self.Semaphore(0)
806
807 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000808 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000809 p.start()
810
811 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000812 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000813 p.start()
814
815 # wait for both children to start sleeping
816 sleeping.acquire()
817 sleeping.acquire()
818
819 # check no process/thread has woken up
820 time.sleep(DELTA)
821 self.assertReturnsIfImplemented(0, get_value, woken)
822
823 # wake up one process/thread
824 cond.acquire()
825 cond.notify()
826 cond.release()
827
828 # check one process/thread has woken up
829 time.sleep(DELTA)
830 self.assertReturnsIfImplemented(1, get_value, woken)
831
832 # wake up another
833 cond.acquire()
834 cond.notify()
835 cond.release()
836
837 # check other has woken up
838 time.sleep(DELTA)
839 self.assertReturnsIfImplemented(2, get_value, woken)
840
841 # check state is not mucked up
842 self.check_invariant(cond)
843 p.join()
844
845 def test_notify_all(self):
846 cond = self.Condition()
847 sleeping = self.Semaphore(0)
848 woken = self.Semaphore(0)
849
850 # start some threads/processes which will timeout
851 for i in range(3):
852 p = self.Process(target=self.f,
853 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000854 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000855 p.start()
856
857 t = threading.Thread(target=self.f,
858 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000859 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000860 t.start()
861
862 # wait for them all to sleep
863 for i in range(6):
864 sleeping.acquire()
865
866 # check they have all timed out
867 for i in range(6):
868 woken.acquire()
869 self.assertReturnsIfImplemented(0, get_value, woken)
870
871 # check state is not mucked up
872 self.check_invariant(cond)
873
874 # start some more threads/processes
875 for i in range(3):
876 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000877 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000878 p.start()
879
880 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000881 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000882 t.start()
883
884 # wait for them to all sleep
885 for i in range(6):
886 sleeping.acquire()
887
888 # check no process/thread has woken up
889 time.sleep(DELTA)
890 self.assertReturnsIfImplemented(0, get_value, woken)
891
892 # wake them all up
893 cond.acquire()
894 cond.notify_all()
895 cond.release()
896
897 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200898 for i in range(10):
899 try:
900 if get_value(woken) == 6:
901 break
902 except NotImplementedError:
903 break
904 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000905 self.assertReturnsIfImplemented(6, get_value, woken)
906
907 # check state is not mucked up
908 self.check_invariant(cond)
909
910 def test_timeout(self):
911 cond = self.Condition()
912 wait = TimingWrapper(cond.wait)
913 cond.acquire()
914 res = wait(TIMEOUT1)
915 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000916 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000917 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
918
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200919 @classmethod
920 def _test_waitfor_f(cls, cond, state):
921 with cond:
922 state.value = 0
923 cond.notify()
924 result = cond.wait_for(lambda : state.value==4)
925 if not result or state.value != 4:
926 sys.exit(1)
927
928 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
929 def test_waitfor(self):
930 # based on test in test/lock_tests.py
931 cond = self.Condition()
932 state = self.Value('i', -1)
933
934 p = self.Process(target=self._test_waitfor_f, args=(cond, state))
935 p.daemon = True
936 p.start()
937
938 with cond:
939 result = cond.wait_for(lambda : state.value==0)
940 self.assertTrue(result)
941 self.assertEqual(state.value, 0)
942
943 for i in range(4):
944 time.sleep(0.01)
945 with cond:
946 state.value += 1
947 cond.notify()
948
949 p.join(5)
950 self.assertFalse(p.is_alive())
951 self.assertEqual(p.exitcode, 0)
952
953 @classmethod
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100954 def _test_waitfor_timeout_f(cls, cond, state, success, sem):
955 sem.release()
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200956 with cond:
957 expected = 0.1
958 dt = time.time()
959 result = cond.wait_for(lambda : state.value==4, timeout=expected)
960 dt = time.time() - dt
961 # borrow logic in assertTimeout() from test/lock_tests.py
962 if not result and expected * 0.6 < dt < expected * 10.0:
963 success.value = True
964
965 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
966 def test_waitfor_timeout(self):
967 # based on test in test/lock_tests.py
968 cond = self.Condition()
969 state = self.Value('i', 0)
970 success = self.Value('i', False)
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100971 sem = self.Semaphore(0)
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200972
973 p = self.Process(target=self._test_waitfor_timeout_f,
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100974 args=(cond, state, success, sem))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200975 p.daemon = True
976 p.start()
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100977 self.assertTrue(sem.acquire(timeout=10))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200978
979 # Only increment 3 times, so state == 4 is never reached.
980 for i in range(3):
981 time.sleep(0.01)
982 with cond:
983 state.value += 1
984 cond.notify()
985
986 p.join(5)
987 self.assertTrue(success.value)
988
Richard Oudkerk98449932012-06-05 13:15:29 +0100989 @classmethod
990 def _test_wait_result(cls, c, pid):
991 with c:
992 c.notify()
993 time.sleep(1)
994 if pid is not None:
995 os.kill(pid, signal.SIGINT)
996
997 def test_wait_result(self):
998 if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
999 pid = os.getpid()
1000 else:
1001 pid = None
1002
1003 c = self.Condition()
1004 with c:
1005 self.assertFalse(c.wait(0))
1006 self.assertFalse(c.wait(0.1))
1007
1008 p = self.Process(target=self._test_wait_result, args=(c, pid))
1009 p.start()
1010
1011 self.assertTrue(c.wait(10))
1012 if pid is not None:
1013 self.assertRaises(KeyboardInterrupt, c.wait, 10)
1014
1015 p.join()
1016
Benjamin Petersone711caf2008-06-11 16:44:04 +00001017
1018class _TestEvent(BaseTestCase):
1019
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001020 @classmethod
1021 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001022 time.sleep(TIMEOUT2)
1023 event.set()
1024
1025 def test_event(self):
1026 event = self.Event()
1027 wait = TimingWrapper(event.wait)
1028
Ezio Melotti13925002011-03-16 11:05:33 +02001029 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +00001030 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +00001031 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001032
Benjamin Peterson965ce872009-04-05 21:24:58 +00001033 # Removed, threading.Event.wait() will return the value of the __flag
1034 # instead of None. API Shear with the semaphore backed mp.Event
1035 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001036 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001037 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001038 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1039
1040 event.set()
1041
1042 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +00001043 self.assertEqual(event.is_set(), True)
1044 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001045 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001046 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001047 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1048 # self.assertEqual(event.is_set(), True)
1049
1050 event.clear()
1051
1052 #self.assertEqual(event.is_set(), False)
1053
Jesus Cea94f964f2011-09-09 20:26:57 +02001054 p = self.Process(target=self._test_event, args=(event,))
1055 p.daemon = True
1056 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +00001057 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001058
1059#
1060#
1061#
1062
1063class _TestValue(BaseTestCase):
1064
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001065 ALLOWED_TYPES = ('processes',)
1066
Benjamin Petersone711caf2008-06-11 16:44:04 +00001067 codes_values = [
1068 ('i', 4343, 24234),
1069 ('d', 3.625, -4.25),
1070 ('h', -232, 234),
1071 ('c', latin('x'), latin('y'))
1072 ]
1073
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001074 def setUp(self):
1075 if not HAS_SHAREDCTYPES:
1076 self.skipTest("requires multiprocessing.sharedctypes")
1077
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001078 @classmethod
1079 def _test(cls, values):
1080 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001081 sv.value = cv[2]
1082
1083
1084 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001085 if raw:
1086 values = [self.RawValue(code, value)
1087 for code, value, _ in self.codes_values]
1088 else:
1089 values = [self.Value(code, value)
1090 for code, value, _ in self.codes_values]
1091
1092 for sv, cv in zip(values, self.codes_values):
1093 self.assertEqual(sv.value, cv[1])
1094
1095 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001096 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001097 proc.start()
1098 proc.join()
1099
1100 for sv, cv in zip(values, self.codes_values):
1101 self.assertEqual(sv.value, cv[2])
1102
1103 def test_rawvalue(self):
1104 self.test_value(raw=True)
1105
1106 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001107 val1 = self.Value('i', 5)
1108 lock1 = val1.get_lock()
1109 obj1 = val1.get_obj()
1110
1111 val2 = self.Value('i', 5, lock=None)
1112 lock2 = val2.get_lock()
1113 obj2 = val2.get_obj()
1114
1115 lock = self.Lock()
1116 val3 = self.Value('i', 5, lock=lock)
1117 lock3 = val3.get_lock()
1118 obj3 = val3.get_obj()
1119 self.assertEqual(lock, lock3)
1120
Jesse Nollerb0516a62009-01-18 03:11:38 +00001121 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001122 self.assertFalse(hasattr(arr4, 'get_lock'))
1123 self.assertFalse(hasattr(arr4, 'get_obj'))
1124
Jesse Nollerb0516a62009-01-18 03:11:38 +00001125 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1126
1127 arr5 = self.RawValue('i', 5)
1128 self.assertFalse(hasattr(arr5, 'get_lock'))
1129 self.assertFalse(hasattr(arr5, 'get_obj'))
1130
Benjamin Petersone711caf2008-06-11 16:44:04 +00001131
1132class _TestArray(BaseTestCase):
1133
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001134 ALLOWED_TYPES = ('processes',)
1135
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001136 @classmethod
1137 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001138 for i in range(1, len(seq)):
1139 seq[i] += seq[i-1]
1140
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001141 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001142 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001143 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1144 if raw:
1145 arr = self.RawArray('i', seq)
1146 else:
1147 arr = self.Array('i', seq)
1148
1149 self.assertEqual(len(arr), len(seq))
1150 self.assertEqual(arr[3], seq[3])
1151 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1152
1153 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1154
1155 self.assertEqual(list(arr[:]), seq)
1156
1157 self.f(seq)
1158
1159 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001160 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001161 p.start()
1162 p.join()
1163
1164 self.assertEqual(list(arr[:]), seq)
1165
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001166 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001167 def test_array_from_size(self):
1168 size = 10
1169 # Test for zeroing (see issue #11675).
1170 # The repetition below strengthens the test by increasing the chances
1171 # of previously allocated non-zero memory being used for the new array
1172 # on the 2nd and 3rd loops.
1173 for _ in range(3):
1174 arr = self.Array('i', size)
1175 self.assertEqual(len(arr), size)
1176 self.assertEqual(list(arr), [0] * size)
1177 arr[:] = range(10)
1178 self.assertEqual(list(arr), list(range(10)))
1179 del arr
1180
1181 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001182 def test_rawarray(self):
1183 self.test_array(raw=True)
1184
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001185 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001186 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001187 arr1 = self.Array('i', list(range(10)))
1188 lock1 = arr1.get_lock()
1189 obj1 = arr1.get_obj()
1190
1191 arr2 = self.Array('i', list(range(10)), lock=None)
1192 lock2 = arr2.get_lock()
1193 obj2 = arr2.get_obj()
1194
1195 lock = self.Lock()
1196 arr3 = self.Array('i', list(range(10)), lock=lock)
1197 lock3 = arr3.get_lock()
1198 obj3 = arr3.get_obj()
1199 self.assertEqual(lock, lock3)
1200
Jesse Nollerb0516a62009-01-18 03:11:38 +00001201 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001202 self.assertFalse(hasattr(arr4, 'get_lock'))
1203 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001204 self.assertRaises(AttributeError,
1205 self.Array, 'i', range(10), lock='notalock')
1206
1207 arr5 = self.RawArray('i', range(10))
1208 self.assertFalse(hasattr(arr5, 'get_lock'))
1209 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001210
1211#
1212#
1213#
1214
1215class _TestContainers(BaseTestCase):
1216
1217 ALLOWED_TYPES = ('manager',)
1218
1219 def test_list(self):
1220 a = self.list(list(range(10)))
1221 self.assertEqual(a[:], list(range(10)))
1222
1223 b = self.list()
1224 self.assertEqual(b[:], [])
1225
1226 b.extend(list(range(5)))
1227 self.assertEqual(b[:], list(range(5)))
1228
1229 self.assertEqual(b[2], 2)
1230 self.assertEqual(b[2:10], [2,3,4])
1231
1232 b *= 2
1233 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1234
1235 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1236
1237 self.assertEqual(a[:], list(range(10)))
1238
1239 d = [a, b]
1240 e = self.list(d)
1241 self.assertEqual(
1242 e[:],
1243 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1244 )
1245
1246 f = self.list([a])
1247 a.append('hello')
1248 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1249
1250 def test_dict(self):
1251 d = self.dict()
1252 indices = list(range(65, 70))
1253 for i in indices:
1254 d[i] = chr(i)
1255 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1256 self.assertEqual(sorted(d.keys()), indices)
1257 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1258 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1259
1260 def test_namespace(self):
1261 n = self.Namespace()
1262 n.name = 'Bob'
1263 n.job = 'Builder'
1264 n._hidden = 'hidden'
1265 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1266 del n.job
1267 self.assertEqual(str(n), "Namespace(name='Bob')")
1268 self.assertTrue(hasattr(n, 'name'))
1269 self.assertTrue(not hasattr(n, 'job'))
1270
1271#
1272#
1273#
1274
1275def sqr(x, wait=0.0):
1276 time.sleep(wait)
1277 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001278
Antoine Pitroude911b22011-12-21 11:03:24 +01001279def mul(x, y):
1280 return x*y
1281
Benjamin Petersone711caf2008-06-11 16:44:04 +00001282class _TestPool(BaseTestCase):
1283
1284 def test_apply(self):
1285 papply = self.pool.apply
1286 self.assertEqual(papply(sqr, (5,)), sqr(5))
1287 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1288
1289 def test_map(self):
1290 pmap = self.pool.map
1291 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1292 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1293 list(map(sqr, list(range(100)))))
1294
Antoine Pitroude911b22011-12-21 11:03:24 +01001295 def test_starmap(self):
1296 psmap = self.pool.starmap
1297 tuples = list(zip(range(10), range(9,-1, -1)))
1298 self.assertEqual(psmap(mul, tuples),
1299 list(itertools.starmap(mul, tuples)))
1300 tuples = list(zip(range(100), range(99,-1, -1)))
1301 self.assertEqual(psmap(mul, tuples, chunksize=20),
1302 list(itertools.starmap(mul, tuples)))
1303
1304 def test_starmap_async(self):
1305 tuples = list(zip(range(100), range(99,-1, -1)))
1306 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1307 list(itertools.starmap(mul, tuples)))
1308
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001309 def test_map_chunksize(self):
1310 try:
1311 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1312 except multiprocessing.TimeoutError:
1313 self.fail("pool.map_async with chunksize stalled on null list")
1314
Benjamin Petersone711caf2008-06-11 16:44:04 +00001315 def test_async(self):
1316 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1317 get = TimingWrapper(res.get)
1318 self.assertEqual(get(), 49)
1319 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1320
1321 def test_async_timeout(self):
1322 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1323 get = TimingWrapper(res.get)
1324 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1325 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1326
1327 def test_imap(self):
1328 it = self.pool.imap(sqr, list(range(10)))
1329 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1330
1331 it = self.pool.imap(sqr, list(range(10)))
1332 for i in range(10):
1333 self.assertEqual(next(it), i*i)
1334 self.assertRaises(StopIteration, it.__next__)
1335
1336 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1337 for i in range(1000):
1338 self.assertEqual(next(it), i*i)
1339 self.assertRaises(StopIteration, it.__next__)
1340
1341 def test_imap_unordered(self):
1342 it = self.pool.imap_unordered(sqr, list(range(1000)))
1343 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1344
1345 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1346 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1347
1348 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001349 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1350 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1351
Benjamin Petersone711caf2008-06-11 16:44:04 +00001352 p = multiprocessing.Pool(3)
1353 self.assertEqual(3, len(p._pool))
1354 p.close()
1355 p.join()
1356
1357 def test_terminate(self):
1358 if self.TYPE == 'manager':
1359 # On Unix a forked process increfs each shared object to
1360 # which its parent process held a reference. If the
1361 # forked process gets terminated then there is likely to
1362 # be a reference leak. So to prevent
1363 # _TestZZZNumberOfObjects from failing we skip this test
1364 # when using a manager.
1365 return
1366
1367 result = self.pool.map_async(
1368 time.sleep, [0.1 for i in range(10000)], chunksize=1
1369 )
1370 self.pool.terminate()
1371 join = TimingWrapper(self.pool.join)
1372 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001373 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001374
Richard Oudkerke41682b2012-06-06 19:04:57 +01001375 def test_empty_iterable(self):
1376 # See Issue 12157
1377 p = self.Pool(1)
1378
1379 self.assertEqual(p.map(sqr, []), [])
1380 self.assertEqual(list(p.imap(sqr, [])), [])
1381 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1382 self.assertEqual(p.map_async(sqr, []).get(), [])
1383
1384 p.close()
1385 p.join()
1386
Ask Solem2afcbf22010-11-09 20:55:52 +00001387def raising():
1388 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001389
Ask Solem2afcbf22010-11-09 20:55:52 +00001390def unpickleable_result():
1391 return lambda: 42
1392
1393class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001394 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001395
1396 def test_async_error_callback(self):
1397 p = multiprocessing.Pool(2)
1398
1399 scratchpad = [None]
1400 def errback(exc):
1401 scratchpad[0] = exc
1402
1403 res = p.apply_async(raising, error_callback=errback)
1404 self.assertRaises(KeyError, res.get)
1405 self.assertTrue(scratchpad[0])
1406 self.assertIsInstance(scratchpad[0], KeyError)
1407
1408 p.close()
1409 p.join()
1410
1411 def test_unpickleable_result(self):
1412 from multiprocessing.pool import MaybeEncodingError
1413 p = multiprocessing.Pool(2)
1414
1415 # Make sure we don't lose pool processes because of encoding errors.
1416 for iteration in range(20):
1417
1418 scratchpad = [None]
1419 def errback(exc):
1420 scratchpad[0] = exc
1421
1422 res = p.apply_async(unpickleable_result, error_callback=errback)
1423 self.assertRaises(MaybeEncodingError, res.get)
1424 wrapped = scratchpad[0]
1425 self.assertTrue(wrapped)
1426 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1427 self.assertIsNotNone(wrapped.exc)
1428 self.assertIsNotNone(wrapped.value)
1429
1430 p.close()
1431 p.join()
1432
1433class _TestPoolWorkerLifetime(BaseTestCase):
1434 ALLOWED_TYPES = ('processes', )
1435
Jesse Noller1f0b6582010-01-27 03:36:01 +00001436 def test_pool_worker_lifetime(self):
1437 p = multiprocessing.Pool(3, maxtasksperchild=10)
1438 self.assertEqual(3, len(p._pool))
1439 origworkerpids = [w.pid for w in p._pool]
1440 # Run many tasks so each worker gets replaced (hopefully)
1441 results = []
1442 for i in range(100):
1443 results.append(p.apply_async(sqr, (i, )))
1444 # Fetch the results and verify we got the right answers,
1445 # also ensuring all the tasks have completed.
1446 for (j, res) in enumerate(results):
1447 self.assertEqual(res.get(), sqr(j))
1448 # Refill the pool
1449 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001450 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001451 # (countdown * DELTA = 5 seconds max startup process time)
1452 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001453 while countdown and not all(w.is_alive() for w in p._pool):
1454 countdown -= 1
1455 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001456 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001457 # All pids should be assigned. See issue #7805.
1458 self.assertNotIn(None, origworkerpids)
1459 self.assertNotIn(None, finalworkerpids)
1460 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001461 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1462 p.close()
1463 p.join()
1464
Charles-François Natalif8859e12011-10-24 18:45:29 +02001465 def test_pool_worker_lifetime_early_close(self):
1466 # Issue #10332: closing a pool whose workers have limited lifetimes
1467 # before all the tasks completed would make join() hang.
1468 p = multiprocessing.Pool(3, maxtasksperchild=1)
1469 results = []
1470 for i in range(6):
1471 results.append(p.apply_async(sqr, (i, 0.3)))
1472 p.close()
1473 p.join()
1474 # check the results
1475 for (j, res) in enumerate(results):
1476 self.assertEqual(res.get(), sqr(j))
1477
1478
Benjamin Petersone711caf2008-06-11 16:44:04 +00001479#
1480# Test that manager has expected number of shared objects left
1481#
1482
1483class _TestZZZNumberOfObjects(BaseTestCase):
1484 # Because test cases are sorted alphabetically, this one will get
1485 # run after all the other tests for the manager. It tests that
1486 # there have been no "reference leaks" for the manager's shared
1487 # objects. Note the comment in _TestPool.test_terminate().
1488 ALLOWED_TYPES = ('manager',)
1489
1490 def test_number_of_objects(self):
1491 EXPECTED_NUMBER = 1 # the pool object is still alive
1492 multiprocessing.active_children() # discard dead process objs
1493 gc.collect() # do garbage collection
1494 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001495 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001496 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001497 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001498 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001499
1500 self.assertEqual(refs, EXPECTED_NUMBER)
1501
1502#
1503# Test of creating a customized manager class
1504#
1505
1506from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1507
1508class FooBar(object):
1509 def f(self):
1510 return 'f()'
1511 def g(self):
1512 raise ValueError
1513 def _h(self):
1514 return '_h()'
1515
1516def baz():
1517 for i in range(10):
1518 yield i*i
1519
1520class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001521 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001522 def __iter__(self):
1523 return self
1524 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001525 return self._callmethod('__next__')
1526
1527class MyManager(BaseManager):
1528 pass
1529
1530MyManager.register('Foo', callable=FooBar)
1531MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1532MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1533
1534
1535class _TestMyManager(BaseTestCase):
1536
1537 ALLOWED_TYPES = ('manager',)
1538
1539 def test_mymanager(self):
1540 manager = MyManager()
1541 manager.start()
1542
1543 foo = manager.Foo()
1544 bar = manager.Bar()
1545 baz = manager.baz()
1546
1547 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1548 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1549
1550 self.assertEqual(foo_methods, ['f', 'g'])
1551 self.assertEqual(bar_methods, ['f', '_h'])
1552
1553 self.assertEqual(foo.f(), 'f()')
1554 self.assertRaises(ValueError, foo.g)
1555 self.assertEqual(foo._callmethod('f'), 'f()')
1556 self.assertRaises(RemoteError, foo._callmethod, '_h')
1557
1558 self.assertEqual(bar.f(), 'f()')
1559 self.assertEqual(bar._h(), '_h()')
1560 self.assertEqual(bar._callmethod('f'), 'f()')
1561 self.assertEqual(bar._callmethod('_h'), '_h()')
1562
1563 self.assertEqual(list(baz), [i*i for i in range(10)])
1564
1565 manager.shutdown()
1566
1567#
1568# Test of connecting to a remote server and using xmlrpclib for serialization
1569#
1570
1571_queue = pyqueue.Queue()
1572def get_queue():
1573 return _queue
1574
1575class QueueManager(BaseManager):
1576 '''manager class used by server process'''
1577QueueManager.register('get_queue', callable=get_queue)
1578
1579class QueueManager2(BaseManager):
1580 '''manager class which specifies the same interface as QueueManager'''
1581QueueManager2.register('get_queue')
1582
1583
1584SERIALIZER = 'xmlrpclib'
1585
1586class _TestRemoteManager(BaseTestCase):
1587
1588 ALLOWED_TYPES = ('manager',)
1589
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001590 @classmethod
1591 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001592 manager = QueueManager2(
1593 address=address, authkey=authkey, serializer=SERIALIZER
1594 )
1595 manager.connect()
1596 queue = manager.get_queue()
1597 queue.put(('hello world', None, True, 2.25))
1598
1599 def test_remote(self):
1600 authkey = os.urandom(32)
1601
1602 manager = QueueManager(
1603 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1604 )
1605 manager.start()
1606
1607 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001608 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001609 p.start()
1610
1611 manager2 = QueueManager2(
1612 address=manager.address, authkey=authkey, serializer=SERIALIZER
1613 )
1614 manager2.connect()
1615 queue = manager2.get_queue()
1616
1617 # Note that xmlrpclib will deserialize object as a list not a tuple
1618 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1619
1620 # Because we are using xmlrpclib for serialization instead of
1621 # pickle this will cause a serialization error.
1622 self.assertRaises(Exception, queue.put, time.sleep)
1623
1624 # Make queue finalizer run before the server is stopped
1625 del queue
1626 manager.shutdown()
1627
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001628class _TestManagerRestart(BaseTestCase):
1629
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001630 @classmethod
1631 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001632 manager = QueueManager(
1633 address=address, authkey=authkey, serializer=SERIALIZER)
1634 manager.connect()
1635 queue = manager.get_queue()
1636 queue.put('hello world')
1637
1638 def test_rapid_restart(self):
1639 authkey = os.urandom(32)
1640 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001641 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001642 srvr = manager.get_server()
1643 addr = srvr.address
1644 # Close the connection.Listener socket which gets opened as a part
1645 # of manager.get_server(). It's not needed for the test.
1646 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001647 manager.start()
1648
1649 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001650 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001651 p.start()
1652 queue = manager.get_queue()
1653 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001654 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001655 manager.shutdown()
1656 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001657 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001658 try:
1659 manager.start()
1660 except IOError as e:
1661 if e.errno != errno.EADDRINUSE:
1662 raise
1663 # Retry after some time, in case the old socket was lingering
1664 # (sporadic failure on buildbots)
1665 time.sleep(1.0)
1666 manager = QueueManager(
1667 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001668 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001669
Benjamin Petersone711caf2008-06-11 16:44:04 +00001670#
1671#
1672#
1673
1674SENTINEL = latin('')
1675
1676class _TestConnection(BaseTestCase):
1677
1678 ALLOWED_TYPES = ('processes', 'threads')
1679
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001680 @classmethod
1681 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001682 for msg in iter(conn.recv_bytes, SENTINEL):
1683 conn.send_bytes(msg)
1684 conn.close()
1685
1686 def test_connection(self):
1687 conn, child_conn = self.Pipe()
1688
1689 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001690 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001691 p.start()
1692
1693 seq = [1, 2.25, None]
1694 msg = latin('hello world')
1695 longmsg = msg * 10
1696 arr = array.array('i', list(range(4)))
1697
1698 if self.TYPE == 'processes':
1699 self.assertEqual(type(conn.fileno()), int)
1700
1701 self.assertEqual(conn.send(seq), None)
1702 self.assertEqual(conn.recv(), seq)
1703
1704 self.assertEqual(conn.send_bytes(msg), None)
1705 self.assertEqual(conn.recv_bytes(), msg)
1706
1707 if self.TYPE == 'processes':
1708 buffer = array.array('i', [0]*10)
1709 expected = list(arr) + [0] * (10 - len(arr))
1710 self.assertEqual(conn.send_bytes(arr), None)
1711 self.assertEqual(conn.recv_bytes_into(buffer),
1712 len(arr) * buffer.itemsize)
1713 self.assertEqual(list(buffer), expected)
1714
1715 buffer = array.array('i', [0]*10)
1716 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1717 self.assertEqual(conn.send_bytes(arr), None)
1718 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1719 len(arr) * buffer.itemsize)
1720 self.assertEqual(list(buffer), expected)
1721
1722 buffer = bytearray(latin(' ' * 40))
1723 self.assertEqual(conn.send_bytes(longmsg), None)
1724 try:
1725 res = conn.recv_bytes_into(buffer)
1726 except multiprocessing.BufferTooShort as e:
1727 self.assertEqual(e.args, (longmsg,))
1728 else:
1729 self.fail('expected BufferTooShort, got %s' % res)
1730
1731 poll = TimingWrapper(conn.poll)
1732
1733 self.assertEqual(poll(), False)
1734 self.assertTimingAlmostEqual(poll.elapsed, 0)
1735
Richard Oudkerk59d54042012-05-10 16:11:12 +01001736 self.assertEqual(poll(-1), False)
1737 self.assertTimingAlmostEqual(poll.elapsed, 0)
1738
Benjamin Petersone711caf2008-06-11 16:44:04 +00001739 self.assertEqual(poll(TIMEOUT1), False)
1740 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1741
1742 conn.send(None)
1743
1744 self.assertEqual(poll(TIMEOUT1), True)
1745 self.assertTimingAlmostEqual(poll.elapsed, 0)
1746
1747 self.assertEqual(conn.recv(), None)
1748
1749 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1750 conn.send_bytes(really_big_msg)
1751 self.assertEqual(conn.recv_bytes(), really_big_msg)
1752
1753 conn.send_bytes(SENTINEL) # tell child to quit
1754 child_conn.close()
1755
1756 if self.TYPE == 'processes':
1757 self.assertEqual(conn.readable, True)
1758 self.assertEqual(conn.writable, True)
1759 self.assertRaises(EOFError, conn.recv)
1760 self.assertRaises(EOFError, conn.recv_bytes)
1761
1762 p.join()
1763
1764 def test_duplex_false(self):
1765 reader, writer = self.Pipe(duplex=False)
1766 self.assertEqual(writer.send(1), None)
1767 self.assertEqual(reader.recv(), 1)
1768 if self.TYPE == 'processes':
1769 self.assertEqual(reader.readable, True)
1770 self.assertEqual(reader.writable, False)
1771 self.assertEqual(writer.readable, False)
1772 self.assertEqual(writer.writable, True)
1773 self.assertRaises(IOError, reader.send, 2)
1774 self.assertRaises(IOError, writer.recv)
1775 self.assertRaises(IOError, writer.poll)
1776
1777 def test_spawn_close(self):
1778 # We test that a pipe connection can be closed by parent
1779 # process immediately after child is spawned. On Windows this
1780 # would have sometimes failed on old versions because
1781 # child_conn would be closed before the child got a chance to
1782 # duplicate it.
1783 conn, child_conn = self.Pipe()
1784
1785 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001786 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001787 p.start()
1788 child_conn.close() # this might complete before child initializes
1789
1790 msg = latin('hello')
1791 conn.send_bytes(msg)
1792 self.assertEqual(conn.recv_bytes(), msg)
1793
1794 conn.send_bytes(SENTINEL)
1795 conn.close()
1796 p.join()
1797
1798 def test_sendbytes(self):
1799 if self.TYPE != 'processes':
1800 return
1801
1802 msg = latin('abcdefghijklmnopqrstuvwxyz')
1803 a, b = self.Pipe()
1804
1805 a.send_bytes(msg)
1806 self.assertEqual(b.recv_bytes(), msg)
1807
1808 a.send_bytes(msg, 5)
1809 self.assertEqual(b.recv_bytes(), msg[5:])
1810
1811 a.send_bytes(msg, 7, 8)
1812 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1813
1814 a.send_bytes(msg, 26)
1815 self.assertEqual(b.recv_bytes(), latin(''))
1816
1817 a.send_bytes(msg, 26, 0)
1818 self.assertEqual(b.recv_bytes(), latin(''))
1819
1820 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1821
1822 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1823
1824 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1825
1826 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1827
1828 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1829
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001830 @classmethod
1831 def _is_fd_assigned(cls, fd):
1832 try:
1833 os.fstat(fd)
1834 except OSError as e:
1835 if e.errno == errno.EBADF:
1836 return False
1837 raise
1838 else:
1839 return True
1840
1841 @classmethod
1842 def _writefd(cls, conn, data, create_dummy_fds=False):
1843 if create_dummy_fds:
1844 for i in range(0, 256):
1845 if not cls._is_fd_assigned(i):
1846 os.dup2(conn.fileno(), i)
1847 fd = reduction.recv_handle(conn)
1848 if msvcrt:
1849 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1850 os.write(fd, data)
1851 os.close(fd)
1852
Charles-François Natalibc8f0822011-09-20 20:36:51 +02001853 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001854 def test_fd_transfer(self):
1855 if self.TYPE != 'processes':
1856 self.skipTest("only makes sense with processes")
1857 conn, child_conn = self.Pipe(duplex=True)
1858
1859 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02001860 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001861 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001862 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001863 with open(test.support.TESTFN, "wb") as f:
1864 fd = f.fileno()
1865 if msvcrt:
1866 fd = msvcrt.get_osfhandle(fd)
1867 reduction.send_handle(conn, fd, p.pid)
1868 p.join()
1869 with open(test.support.TESTFN, "rb") as f:
1870 self.assertEqual(f.read(), b"foo")
1871
Charles-François Natalibc8f0822011-09-20 20:36:51 +02001872 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001873 @unittest.skipIf(sys.platform == "win32",
1874 "test semantics don't make sense on Windows")
1875 @unittest.skipIf(MAXFD <= 256,
1876 "largest assignable fd number is too small")
1877 @unittest.skipUnless(hasattr(os, "dup2"),
1878 "test needs os.dup2()")
1879 def test_large_fd_transfer(self):
1880 # With fd > 256 (issue #11657)
1881 if self.TYPE != 'processes':
1882 self.skipTest("only makes sense with processes")
1883 conn, child_conn = self.Pipe(duplex=True)
1884
1885 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02001886 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001887 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001888 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001889 with open(test.support.TESTFN, "wb") as f:
1890 fd = f.fileno()
1891 for newfd in range(256, MAXFD):
1892 if not self._is_fd_assigned(newfd):
1893 break
1894 else:
1895 self.fail("could not find an unassigned large file descriptor")
1896 os.dup2(fd, newfd)
1897 try:
1898 reduction.send_handle(conn, newfd, p.pid)
1899 finally:
1900 os.close(newfd)
1901 p.join()
1902 with open(test.support.TESTFN, "rb") as f:
1903 self.assertEqual(f.read(), b"bar")
1904
Jesus Cea4507e642011-09-21 03:53:25 +02001905 @classmethod
1906 def _send_data_without_fd(self, conn):
1907 os.write(conn.fileno(), b"\0")
1908
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001909 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02001910 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1911 def test_missing_fd_transfer(self):
1912 # Check that exception is raised when received data is not
1913 # accompanied by a file descriptor in ancillary data.
1914 if self.TYPE != 'processes':
1915 self.skipTest("only makes sense with processes")
1916 conn, child_conn = self.Pipe(duplex=True)
1917
1918 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1919 p.daemon = True
1920 p.start()
1921 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1922 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001923
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01001924class _TestListener(BaseTestCase):
1925
1926 ALLOWED_TYPES = ('processes')
1927
1928 def test_multiple_bind(self):
1929 for family in self.connection.families:
1930 l = self.connection.Listener(family=family)
1931 self.addCleanup(l.close)
1932 self.assertRaises(OSError, self.connection.Listener,
1933 l.address, family)
1934
Benjamin Petersone711caf2008-06-11 16:44:04 +00001935class _TestListenerClient(BaseTestCase):
1936
1937 ALLOWED_TYPES = ('processes', 'threads')
1938
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001939 @classmethod
1940 def _test(cls, address):
1941 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001942 conn.send('hello')
1943 conn.close()
1944
1945 def test_listener_client(self):
1946 for family in self.connection.families:
1947 l = self.connection.Listener(family=family)
1948 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001949 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001950 p.start()
1951 conn = l.accept()
1952 self.assertEqual(conn.recv(), 'hello')
1953 p.join()
1954 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01001955
Richard Oudkerkfdb8dcf2012-05-05 19:45:37 +01001956 def test_issue14725(self):
1957 l = self.connection.Listener()
1958 p = self.Process(target=self._test, args=(l.address,))
1959 p.daemon = True
1960 p.start()
1961 time.sleep(1)
1962 # On Windows the client process should by now have connected,
1963 # written data and closed the pipe handle by now. This causes
1964 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1965 # 14725.
1966 conn = l.accept()
1967 self.assertEqual(conn.recv(), 'hello')
1968 conn.close()
1969 p.join()
1970 l.close()
1971
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01001972class _TestPoll(unittest.TestCase):
1973
1974 ALLOWED_TYPES = ('processes', 'threads')
1975
1976 def test_empty_string(self):
1977 a, b = self.Pipe()
1978 self.assertEqual(a.poll(), False)
1979 b.send_bytes(b'')
1980 self.assertEqual(a.poll(), True)
1981 self.assertEqual(a.poll(), True)
1982
1983 @classmethod
1984 def _child_strings(cls, conn, strings):
1985 for s in strings:
1986 time.sleep(0.1)
1987 conn.send_bytes(s)
1988 conn.close()
1989
1990 def test_strings(self):
1991 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
1992 a, b = self.Pipe()
1993 p = self.Process(target=self._child_strings, args=(b, strings))
1994 p.start()
1995
1996 for s in strings:
1997 for i in range(200):
1998 if a.poll(0.01):
1999 break
2000 x = a.recv_bytes()
2001 self.assertEqual(s, x)
2002
2003 p.join()
2004
2005 @classmethod
2006 def _child_boundaries(cls, r):
2007 # Polling may "pull" a message in to the child process, but we
2008 # don't want it to pull only part of a message, as that would
2009 # corrupt the pipe for any other processes which might later
2010 # read from it.
2011 r.poll(5)
2012
2013 def test_boundaries(self):
2014 r, w = self.Pipe(False)
2015 p = self.Process(target=self._child_boundaries, args=(r,))
2016 p.start()
2017 time.sleep(2)
2018 L = [b"first", b"second"]
2019 for obj in L:
2020 w.send_bytes(obj)
2021 w.close()
2022 p.join()
2023 self.assertIn(r.recv_bytes(), L)
2024
2025 @classmethod
2026 def _child_dont_merge(cls, b):
2027 b.send_bytes(b'a')
2028 b.send_bytes(b'b')
2029 b.send_bytes(b'cd')
2030
2031 def test_dont_merge(self):
2032 a, b = self.Pipe()
2033 self.assertEqual(a.poll(0.0), False)
2034 self.assertEqual(a.poll(0.1), False)
2035
2036 p = self.Process(target=self._child_dont_merge, args=(b,))
2037 p.start()
2038
2039 self.assertEqual(a.recv_bytes(), b'a')
2040 self.assertEqual(a.poll(1.0), True)
2041 self.assertEqual(a.poll(1.0), True)
2042 self.assertEqual(a.recv_bytes(), b'b')
2043 self.assertEqual(a.poll(1.0), True)
2044 self.assertEqual(a.poll(1.0), True)
2045 self.assertEqual(a.poll(0.0), True)
2046 self.assertEqual(a.recv_bytes(), b'cd')
2047
2048 p.join()
2049
Benjamin Petersone711caf2008-06-11 16:44:04 +00002050#
2051# Test of sending connection and socket objects between processes
2052#
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002053
Richard Oudkerk24524192012-04-30 14:48:51 +01002054# Intermittent fails on Mac OS X -- see Issue14669 and Issue12958
2055@unittest.skipIf(sys.platform == "darwin", "fd passing unreliable on Mac OS X")
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002056@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002057class _TestPicklingConnections(BaseTestCase):
2058
2059 ALLOWED_TYPES = ('processes',)
2060
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002061 @classmethod
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02002062 def tearDownClass(cls):
2063 from multiprocessing.reduction import resource_sharer
2064 resource_sharer.stop(timeout=5)
2065
2066 @classmethod
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002067 def _listener(cls, conn, families):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002068 for fam in families:
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002069 l = cls.connection.Listener(family=fam)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002070 conn.send(l.address)
2071 new_conn = l.accept()
2072 conn.send(new_conn)
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002073 new_conn.close()
2074 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002075
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002076 l = socket.socket()
2077 l.bind(('localhost', 0))
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002078 l.listen(1)
Richard Oudkerk5d73c172012-05-08 22:24:47 +01002079 conn.send(l.getsockname())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002080 new_conn, addr = l.accept()
2081 conn.send(new_conn)
2082 new_conn.close()
2083 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002084
2085 conn.recv()
2086
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002087 @classmethod
2088 def _remote(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002089 for (address, msg) in iter(conn.recv, None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002090 client = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002091 client.send(msg.upper())
2092 client.close()
2093
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002094 address, msg = conn.recv()
2095 client = socket.socket()
2096 client.connect(address)
2097 client.sendall(msg.upper())
2098 client.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002099
2100 conn.close()
2101
2102 def test_pickling(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002103 families = self.connection.families
2104
2105 lconn, lconn0 = self.Pipe()
2106 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02002107 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002108 lp.start()
2109 lconn0.close()
2110
2111 rconn, rconn0 = self.Pipe()
2112 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002113 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002114 rp.start()
2115 rconn0.close()
2116
2117 for fam in families:
2118 msg = ('This connection uses family %s' % fam).encode('ascii')
2119 address = lconn.recv()
2120 rconn.send((address, msg))
2121 new_conn = lconn.recv()
2122 self.assertEqual(new_conn.recv(), msg.upper())
2123
2124 rconn.send(None)
2125
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002126 msg = latin('This connection uses a normal socket')
2127 address = lconn.recv()
2128 rconn.send((address, msg))
2129 new_conn = lconn.recv()
Richard Oudkerk4460c342012-04-30 14:48:50 +01002130 buf = []
2131 while True:
2132 s = new_conn.recv(100)
2133 if not s:
2134 break
2135 buf.append(s)
2136 buf = b''.join(buf)
2137 self.assertEqual(buf, msg.upper())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002138 new_conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002139
2140 lconn.send(None)
2141
2142 rconn.close()
2143 lconn.close()
2144
2145 lp.join()
2146 rp.join()
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002147
2148 @classmethod
2149 def child_access(cls, conn):
2150 w = conn.recv()
2151 w.send('all is well')
2152 w.close()
2153
2154 r = conn.recv()
2155 msg = r.recv()
2156 conn.send(msg*2)
2157
2158 conn.close()
2159
2160 def test_access(self):
2161 # On Windows, if we do not specify a destination pid when
2162 # using DupHandle then we need to be careful to use the
2163 # correct access flags for DuplicateHandle(), or else
2164 # DupHandle.detach() will raise PermissionError. For example,
2165 # for a read only pipe handle we should use
2166 # access=FILE_GENERIC_READ. (Unfortunately
2167 # DUPLICATE_SAME_ACCESS does not work.)
2168 conn, child_conn = self.Pipe()
2169 p = self.Process(target=self.child_access, args=(child_conn,))
2170 p.daemon = True
2171 p.start()
2172 child_conn.close()
2173
2174 r, w = self.Pipe(duplex=False)
2175 conn.send(w)
2176 w.close()
2177 self.assertEqual(r.recv(), 'all is well')
2178 r.close()
2179
2180 r, w = self.Pipe(duplex=False)
2181 conn.send(r)
2182 r.close()
2183 w.send('foobar')
2184 w.close()
2185 self.assertEqual(conn.recv(), 'foobar'*2)
2186
Benjamin Petersone711caf2008-06-11 16:44:04 +00002187#
2188#
2189#
2190
2191class _TestHeap(BaseTestCase):
2192
2193 ALLOWED_TYPES = ('processes',)
2194
2195 def test_heap(self):
2196 iterations = 5000
2197 maxblocks = 50
2198 blocks = []
2199
2200 # create and destroy lots of blocks of different sizes
2201 for i in range(iterations):
2202 size = int(random.lognormvariate(0, 1) * 1000)
2203 b = multiprocessing.heap.BufferWrapper(size)
2204 blocks.append(b)
2205 if len(blocks) > maxblocks:
2206 i = random.randrange(maxblocks)
2207 del blocks[i]
2208
2209 # get the heap object
2210 heap = multiprocessing.heap.BufferWrapper._heap
2211
2212 # verify the state of the heap
2213 all = []
2214 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02002215 heap._lock.acquire()
2216 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002217 for L in list(heap._len_to_seq.values()):
2218 for arena, start, stop in L:
2219 all.append((heap._arenas.index(arena), start, stop,
2220 stop-start, 'free'))
2221 for arena, start, stop in heap._allocated_blocks:
2222 all.append((heap._arenas.index(arena), start, stop,
2223 stop-start, 'occupied'))
2224 occupied += (stop-start)
2225
2226 all.sort()
2227
2228 for i in range(len(all)-1):
2229 (arena, start, stop) = all[i][:3]
2230 (narena, nstart, nstop) = all[i+1][:3]
2231 self.assertTrue((arena != narena and nstart == 0) or
2232 (stop == nstart))
2233
Charles-François Natali778db492011-07-02 14:35:49 +02002234 def test_free_from_gc(self):
2235 # Check that freeing of blocks by the garbage collector doesn't deadlock
2236 # (issue #12352).
2237 # Make sure the GC is enabled, and set lower collection thresholds to
2238 # make collections more frequent (and increase the probability of
2239 # deadlock).
2240 if not gc.isenabled():
2241 gc.enable()
2242 self.addCleanup(gc.disable)
2243 thresholds = gc.get_threshold()
2244 self.addCleanup(gc.set_threshold, *thresholds)
2245 gc.set_threshold(10)
2246
2247 # perform numerous block allocations, with cyclic references to make
2248 # sure objects are collected asynchronously by the gc
2249 for i in range(5000):
2250 a = multiprocessing.heap.BufferWrapper(1)
2251 b = multiprocessing.heap.BufferWrapper(1)
2252 # circular references
2253 a.buddy = b
2254 b.buddy = a
2255
Benjamin Petersone711caf2008-06-11 16:44:04 +00002256#
2257#
2258#
2259
Benjamin Petersone711caf2008-06-11 16:44:04 +00002260class _Foo(Structure):
2261 _fields_ = [
2262 ('x', c_int),
2263 ('y', c_double)
2264 ]
2265
2266class _TestSharedCTypes(BaseTestCase):
2267
2268 ALLOWED_TYPES = ('processes',)
2269
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00002270 def setUp(self):
2271 if not HAS_SHAREDCTYPES:
2272 self.skipTest("requires multiprocessing.sharedctypes")
2273
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002274 @classmethod
2275 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002276 x.value *= 2
2277 y.value *= 2
2278 foo.x *= 2
2279 foo.y *= 2
2280 string.value *= 2
2281 for i in range(len(arr)):
2282 arr[i] *= 2
2283
2284 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002285 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002286 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002287 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002288 arr = self.Array('d', list(range(10)), lock=lock)
2289 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00002290 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002291
2292 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02002293 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002294 p.start()
2295 p.join()
2296
2297 self.assertEqual(x.value, 14)
2298 self.assertAlmostEqual(y.value, 2.0/3.0)
2299 self.assertEqual(foo.x, 6)
2300 self.assertAlmostEqual(foo.y, 4.0)
2301 for i in range(10):
2302 self.assertAlmostEqual(arr[i], i*2)
2303 self.assertEqual(string.value, latin('hellohello'))
2304
2305 def test_synchronize(self):
2306 self.test_sharedctypes(lock=True)
2307
2308 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002309 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00002310 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002311 foo.x = 0
2312 foo.y = 0
2313 self.assertEqual(bar.x, 2)
2314 self.assertAlmostEqual(bar.y, 5.0)
2315
2316#
2317#
2318#
2319
2320class _TestFinalize(BaseTestCase):
2321
2322 ALLOWED_TYPES = ('processes',)
2323
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002324 @classmethod
2325 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002326 class Foo(object):
2327 pass
2328
2329 a = Foo()
2330 util.Finalize(a, conn.send, args=('a',))
2331 del a # triggers callback for a
2332
2333 b = Foo()
2334 close_b = util.Finalize(b, conn.send, args=('b',))
2335 close_b() # triggers callback for b
2336 close_b() # does nothing because callback has already been called
2337 del b # does nothing because callback has already been called
2338
2339 c = Foo()
2340 util.Finalize(c, conn.send, args=('c',))
2341
2342 d10 = Foo()
2343 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2344
2345 d01 = Foo()
2346 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2347 d02 = Foo()
2348 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2349 d03 = Foo()
2350 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2351
2352 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2353
2354 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2355
Ezio Melotti13925002011-03-16 11:05:33 +02002356 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002357 # garbage collecting locals
2358 util._exit_function()
2359 conn.close()
2360 os._exit(0)
2361
2362 def test_finalize(self):
2363 conn, child_conn = self.Pipe()
2364
2365 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002366 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002367 p.start()
2368 p.join()
2369
2370 result = [obj for obj in iter(conn.recv, 'STOP')]
2371 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2372
2373#
2374# Test that from ... import * works for each module
2375#
2376
2377class _TestImportStar(BaseTestCase):
2378
2379 ALLOWED_TYPES = ('processes',)
2380
2381 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002382 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002383 'multiprocessing', 'multiprocessing.connection',
2384 'multiprocessing.heap', 'multiprocessing.managers',
2385 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002386 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002387 ]
2388
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002389 if HAS_REDUCTION:
2390 modules.append('multiprocessing.reduction')
2391
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002392 if c_int is not None:
2393 # This module requires _ctypes
2394 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002395
2396 for name in modules:
2397 __import__(name)
2398 mod = sys.modules[name]
2399
2400 for attr in getattr(mod, '__all__', ()):
2401 self.assertTrue(
2402 hasattr(mod, attr),
2403 '%r does not have attribute %r' % (mod, attr)
2404 )
2405
2406#
2407# Quick test that logging works -- does not test logging output
2408#
2409
2410class _TestLogging(BaseTestCase):
2411
2412 ALLOWED_TYPES = ('processes',)
2413
2414 def test_enable_logging(self):
2415 logger = multiprocessing.get_logger()
2416 logger.setLevel(util.SUBWARNING)
2417 self.assertTrue(logger is not None)
2418 logger.debug('this will not be printed')
2419 logger.info('nor will this')
2420 logger.setLevel(LOG_LEVEL)
2421
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002422 @classmethod
2423 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002424 logger = multiprocessing.get_logger()
2425 conn.send(logger.getEffectiveLevel())
2426
2427 def test_level(self):
2428 LEVEL1 = 32
2429 LEVEL2 = 37
2430
2431 logger = multiprocessing.get_logger()
2432 root_logger = logging.getLogger()
2433 root_level = root_logger.level
2434
2435 reader, writer = multiprocessing.Pipe(duplex=False)
2436
2437 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002438 p = self.Process(target=self._test_level, args=(writer,))
2439 p.daemon = True
2440 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002441 self.assertEqual(LEVEL1, reader.recv())
2442
2443 logger.setLevel(logging.NOTSET)
2444 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002445 p = self.Process(target=self._test_level, args=(writer,))
2446 p.daemon = True
2447 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002448 self.assertEqual(LEVEL2, reader.recv())
2449
2450 root_logger.setLevel(root_level)
2451 logger.setLevel(level=LOG_LEVEL)
2452
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002453
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002454# class _TestLoggingProcessName(BaseTestCase):
2455#
2456# def handle(self, record):
2457# assert record.processName == multiprocessing.current_process().name
2458# self.__handled = True
2459#
2460# def test_logging(self):
2461# handler = logging.Handler()
2462# handler.handle = self.handle
2463# self.__handled = False
2464# # Bypass getLogger() and side-effects
2465# logger = logging.getLoggerClass()(
2466# 'multiprocessing.test.TestLoggingProcessName')
2467# logger.addHandler(handler)
2468# logger.propagate = False
2469#
2470# logger.warn('foo')
2471# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002472
Benjamin Petersone711caf2008-06-11 16:44:04 +00002473#
Jesse Noller6214edd2009-01-19 16:23:53 +00002474# Test to verify handle verification, see issue 3321
2475#
2476
2477class TestInvalidHandle(unittest.TestCase):
2478
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002479 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002480 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002481 conn = multiprocessing.connection.Connection(44977608)
2482 try:
2483 self.assertRaises((ValueError, IOError), conn.poll)
2484 finally:
2485 # Hack private attribute _handle to avoid printing an error
2486 # in conn.__del__
2487 conn._handle = None
2488 self.assertRaises((ValueError, IOError),
2489 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002490
Jesse Noller6214edd2009-01-19 16:23:53 +00002491#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002492# Functions used to create test cases from the base ones in this module
2493#
2494
2495def get_attributes(Source, names):
2496 d = {}
2497 for name in names:
2498 obj = getattr(Source, name)
2499 if type(obj) == type(get_attributes):
2500 obj = staticmethod(obj)
2501 d[name] = obj
2502 return d
2503
2504def create_test_cases(Mixin, type):
2505 result = {}
2506 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002507 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002508
2509 for name in list(glob.keys()):
2510 if name.startswith('_Test'):
2511 base = glob[name]
2512 if type in base.ALLOWED_TYPES:
2513 newname = 'With' + Type + name[1:]
2514 class Temp(base, unittest.TestCase, Mixin):
2515 pass
2516 result[newname] = Temp
2517 Temp.__name__ = newname
2518 Temp.__module__ = Mixin.__module__
2519 return result
2520
2521#
2522# Create test cases
2523#
2524
2525class ProcessesMixin(object):
2526 TYPE = 'processes'
2527 Process = multiprocessing.Process
2528 locals().update(get_attributes(multiprocessing, (
2529 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2530 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2531 'RawArray', 'current_process', 'active_children', 'Pipe',
Richard Oudkerke41682b2012-06-06 19:04:57 +01002532 'connection', 'JoinableQueue', 'Pool'
Benjamin Petersone711caf2008-06-11 16:44:04 +00002533 )))
2534
2535testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2536globals().update(testcases_processes)
2537
2538
2539class ManagerMixin(object):
2540 TYPE = 'manager'
2541 Process = multiprocessing.Process
2542 manager = object.__new__(multiprocessing.managers.SyncManager)
2543 locals().update(get_attributes(manager, (
2544 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2545 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
Richard Oudkerke41682b2012-06-06 19:04:57 +01002546 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersone711caf2008-06-11 16:44:04 +00002547 )))
2548
2549testcases_manager = create_test_cases(ManagerMixin, type='manager')
2550globals().update(testcases_manager)
2551
2552
2553class ThreadsMixin(object):
2554 TYPE = 'threads'
2555 Process = multiprocessing.dummy.Process
2556 locals().update(get_attributes(multiprocessing.dummy, (
2557 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2558 'Condition', 'Event', 'Value', 'Array', 'current_process',
2559 'active_children', 'Pipe', 'connection', 'dict', 'list',
Richard Oudkerke41682b2012-06-06 19:04:57 +01002560 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersone711caf2008-06-11 16:44:04 +00002561 )))
2562
2563testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2564globals().update(testcases_threads)
2565
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002566class OtherTest(unittest.TestCase):
2567 # TODO: add more tests for deliver/answer challenge.
2568 def test_deliver_challenge_auth_failure(self):
2569 class _FakeConnection(object):
2570 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002571 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002572 def send_bytes(self, data):
2573 pass
2574 self.assertRaises(multiprocessing.AuthenticationError,
2575 multiprocessing.connection.deliver_challenge,
2576 _FakeConnection(), b'abc')
2577
2578 def test_answer_challenge_auth_failure(self):
2579 class _FakeConnection(object):
2580 def __init__(self):
2581 self.count = 0
2582 def recv_bytes(self, size):
2583 self.count += 1
2584 if self.count == 1:
2585 return multiprocessing.connection.CHALLENGE
2586 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002587 return b'something bogus'
2588 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002589 def send_bytes(self, data):
2590 pass
2591 self.assertRaises(multiprocessing.AuthenticationError,
2592 multiprocessing.connection.answer_challenge,
2593 _FakeConnection(), b'abc')
2594
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002595#
2596# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2597#
2598
2599def initializer(ns):
2600 ns.test += 1
2601
2602class TestInitializers(unittest.TestCase):
2603 def setUp(self):
2604 self.mgr = multiprocessing.Manager()
2605 self.ns = self.mgr.Namespace()
2606 self.ns.test = 0
2607
2608 def tearDown(self):
2609 self.mgr.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01002610 self.mgr.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002611
2612 def test_manager_initializer(self):
2613 m = multiprocessing.managers.SyncManager()
2614 self.assertRaises(TypeError, m.start, 1)
2615 m.start(initializer, (self.ns,))
2616 self.assertEqual(self.ns.test, 1)
2617 m.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01002618 m.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002619
2620 def test_pool_initializer(self):
2621 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2622 p = multiprocessing.Pool(1, initializer, (self.ns,))
2623 p.close()
2624 p.join()
2625 self.assertEqual(self.ns.test, 1)
2626
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002627#
2628# Issue 5155, 5313, 5331: Test process in processes
2629# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2630#
2631
2632def _ThisSubProcess(q):
2633 try:
2634 item = q.get(block=False)
2635 except pyqueue.Empty:
2636 pass
2637
2638def _TestProcess(q):
2639 queue = multiprocessing.Queue()
2640 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002641 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002642 subProc.start()
2643 subProc.join()
2644
2645def _afunc(x):
2646 return x*x
2647
2648def pool_in_process():
2649 pool = multiprocessing.Pool(processes=4)
2650 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01002651 pool.close()
2652 pool.join()
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002653
2654class _file_like(object):
2655 def __init__(self, delegate):
2656 self._delegate = delegate
2657 self._pid = None
2658
2659 @property
2660 def cache(self):
2661 pid = os.getpid()
2662 # There are no race conditions since fork keeps only the running thread
2663 if pid != self._pid:
2664 self._pid = pid
2665 self._cache = []
2666 return self._cache
2667
2668 def write(self, data):
2669 self.cache.append(data)
2670
2671 def flush(self):
2672 self._delegate.write(''.join(self.cache))
2673 self._cache = []
2674
2675class TestStdinBadfiledescriptor(unittest.TestCase):
2676
2677 def test_queue_in_process(self):
2678 queue = multiprocessing.Queue()
2679 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2680 proc.start()
2681 proc.join()
2682
2683 def test_pool_in_process(self):
2684 p = multiprocessing.Process(target=pool_in_process)
2685 p.start()
2686 p.join()
2687
2688 def test_flushing(self):
2689 sio = io.StringIO()
2690 flike = _file_like(sio)
2691 flike.write('foo')
2692 proc = multiprocessing.Process(target=lambda: flike.flush())
2693 flike.flush()
2694 assert sio.getvalue() == 'foo'
2695
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002696
2697class TestWait(unittest.TestCase):
2698
2699 @classmethod
2700 def _child_test_wait(cls, w, slow):
2701 for i in range(10):
2702 if slow:
2703 time.sleep(random.random()*0.1)
2704 w.send((i, os.getpid()))
2705 w.close()
2706
2707 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002708 from multiprocessing.connection import wait
2709 readers = []
2710 procs = []
2711 messages = []
2712
2713 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01002714 r, w = multiprocessing.Pipe(duplex=False)
2715 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002716 p.daemon = True
2717 p.start()
2718 w.close()
2719 readers.append(r)
2720 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01002721 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002722
2723 while readers:
2724 for r in wait(readers):
2725 try:
2726 msg = r.recv()
2727 except EOFError:
2728 readers.remove(r)
2729 r.close()
2730 else:
2731 messages.append(msg)
2732
2733 messages.sort()
2734 expected = sorted((i, p.pid) for i in range(10) for p in procs)
2735 self.assertEqual(messages, expected)
2736
2737 @classmethod
2738 def _child_test_wait_socket(cls, address, slow):
2739 s = socket.socket()
2740 s.connect(address)
2741 for i in range(10):
2742 if slow:
2743 time.sleep(random.random()*0.1)
2744 s.sendall(('%s\n' % i).encode('ascii'))
2745 s.close()
2746
2747 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002748 from multiprocessing.connection import wait
2749 l = socket.socket()
2750 l.bind(('', 0))
2751 l.listen(4)
2752 addr = ('localhost', l.getsockname()[1])
2753 readers = []
2754 procs = []
2755 dic = {}
2756
2757 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01002758 p = multiprocessing.Process(target=self._child_test_wait_socket,
2759 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002760 p.daemon = True
2761 p.start()
2762 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01002763 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002764
2765 for i in range(4):
2766 r, _ = l.accept()
2767 readers.append(r)
2768 dic[r] = []
2769 l.close()
2770
2771 while readers:
2772 for r in wait(readers):
2773 msg = r.recv(32)
2774 if not msg:
2775 readers.remove(r)
2776 r.close()
2777 else:
2778 dic[r].append(msg)
2779
2780 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
2781 for v in dic.values():
2782 self.assertEqual(b''.join(v), expected)
2783
2784 def test_wait_slow(self):
2785 self.test_wait(True)
2786
2787 def test_wait_socket_slow(self):
Richard Oudkerk104b3f42012-05-08 16:08:07 +01002788 self.test_wait_socket(True)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002789
2790 def test_wait_timeout(self):
2791 from multiprocessing.connection import wait
2792
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002793 expected = 5
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002794 a, b = multiprocessing.Pipe()
2795
2796 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002797 res = wait([a, b], expected)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002798 delta = time.time() - start
2799
2800 self.assertEqual(res, [])
Richard Oudkerk6dbca362012-05-06 16:46:36 +01002801 self.assertLess(delta, expected * 2)
2802 self.assertGreater(delta, expected * 0.5)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002803
2804 b.send(None)
2805
2806 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002807 res = wait([a, b], 20)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002808 delta = time.time() - start
2809
2810 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01002811 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002812
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002813 @classmethod
2814 def signal_and_sleep(cls, sem, period):
2815 sem.release()
2816 time.sleep(period)
2817
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002818 def test_wait_integer(self):
2819 from multiprocessing.connection import wait
2820
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002821 expected = 3
2822 sem = multiprocessing.Semaphore(0)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002823 a, b = multiprocessing.Pipe()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002824 p = multiprocessing.Process(target=self.signal_and_sleep,
2825 args=(sem, expected))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002826
2827 p.start()
2828 self.assertIsInstance(p.sentinel, int)
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002829 self.assertTrue(sem.acquire(timeout=20))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002830
2831 start = time.time()
2832 res = wait([a, p.sentinel, b], expected + 20)
2833 delta = time.time() - start
2834
2835 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01002836 self.assertLess(delta, expected + 2)
2837 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002838
2839 a.send(None)
2840
2841 start = time.time()
2842 res = wait([a, p.sentinel, b], 20)
2843 delta = time.time() - start
2844
2845 self.assertEqual(res, [p.sentinel, b])
Antoine Pitrou37749772012-03-09 18:40:15 +01002846 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002847
2848 b.send(None)
2849
2850 start = time.time()
2851 res = wait([a, p.sentinel, b], 20)
2852 delta = time.time() - start
2853
2854 self.assertEqual(res, [a, p.sentinel, b])
Antoine Pitrou37749772012-03-09 18:40:15 +01002855 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002856
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002857 p.terminate()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002858 p.join()
2859
Richard Oudkerk59d54042012-05-10 16:11:12 +01002860 def test_neg_timeout(self):
2861 from multiprocessing.connection import wait
2862 a, b = multiprocessing.Pipe()
2863 t = time.time()
2864 res = wait([a], timeout=-1)
2865 t = time.time() - t
2866 self.assertEqual(res, [])
2867 self.assertLess(t, 1)
2868 a.close()
2869 b.close()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002870
Antoine Pitrou709176f2012-04-01 17:19:09 +02002871#
2872# Issue 14151: Test invalid family on invalid environment
2873#
2874
2875class TestInvalidFamily(unittest.TestCase):
2876
2877 @unittest.skipIf(WIN32, "skipped on Windows")
2878 def test_invalid_family(self):
2879 with self.assertRaises(ValueError):
2880 multiprocessing.connection.Listener(r'\\.\test')
2881
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02002882 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
2883 def test_invalid_family_win32(self):
2884 with self.assertRaises(ValueError):
2885 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02002886
Richard Oudkerk77c84f22012-05-18 14:28:02 +01002887#
2888# Issue 12098: check sys.flags of child matches that for parent
2889#
2890
2891class TestFlags(unittest.TestCase):
2892 @classmethod
2893 def run_in_grandchild(cls, conn):
2894 conn.send(tuple(sys.flags))
2895
2896 @classmethod
2897 def run_in_child(cls):
2898 import json
2899 r, w = multiprocessing.Pipe(duplex=False)
2900 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
2901 p.start()
2902 grandchild_flags = r.recv()
2903 p.join()
2904 r.close()
2905 w.close()
2906 flags = (tuple(sys.flags), grandchild_flags)
2907 print(json.dumps(flags))
2908
2909 def test_flags(self):
2910 import json, subprocess
2911 # start child process using unusual flags
2912 prog = ('from test.test_multiprocessing import TestFlags; ' +
2913 'TestFlags.run_in_child()')
2914 data = subprocess.check_output(
2915 [sys.executable, '-E', '-S', '-O', '-c', prog])
2916 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
2917 self.assertEqual(child_flags, grandchild_flags)
2918
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002919testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Richard Oudkerk77c84f22012-05-18 14:28:02 +01002920 TestStdinBadfiledescriptor, TestWait, TestInvalidFamily,
2921 TestFlags]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002922
Benjamin Petersone711caf2008-06-11 16:44:04 +00002923#
2924#
2925#
2926
2927def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002928 if sys.platform.startswith("linux"):
2929 try:
2930 lock = multiprocessing.RLock()
2931 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002932 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002933
Charles-François Natali221ef672011-11-22 18:55:22 +01002934 check_enough_semaphores()
2935
Benjamin Petersone711caf2008-06-11 16:44:04 +00002936 if run is None:
2937 from test.support import run_unittest as run
2938
2939 util.get_temp_dir() # creates temp directory for use by all processes
2940
2941 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2942
Benjamin Peterson41181742008-07-02 20:22:54 +00002943 ProcessesMixin.pool = multiprocessing.Pool(4)
2944 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2945 ManagerMixin.manager.__init__()
2946 ManagerMixin.manager.start()
2947 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002948
2949 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002950 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2951 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002952 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2953 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002954 )
2955
2956 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2957 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01002958 try:
2959 run(suite)
2960 finally:
2961 ThreadsMixin.pool.terminate()
2962 ProcessesMixin.pool.terminate()
2963 ManagerMixin.pool.terminate()
2964 ManagerMixin.pool.join()
2965 ManagerMixin.manager.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01002966 ManagerMixin.manager.join()
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01002967 ThreadsMixin.pool.join()
2968 ProcessesMixin.pool.join()
2969 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002970
2971def main():
2972 test_main(unittest.TextTestRunner(verbosity=2).run)
2973
2974if __name__ == '__main__':
2975 main()