blob: fd223129c3477a4ba1838eb923735c73be50b7aa [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000027# import threading after _multiprocessing to raise a more revelant error
28# message: "No module named _multiprocessing". _multiprocessing is not compiled
29# without thread support.
30import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.dummy
33import multiprocessing.connection
34import multiprocessing.managers
35import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
Antoine Pitroubcb39d42011-08-23 19:46:22 +020038from multiprocessing import util, reduction
Benjamin Petersone711caf2008-06-11 16:44:04 +000039
Brian Curtinafa88b52010-10-07 01:12:19 +000040try:
41 from multiprocessing.sharedctypes import Value, copy
42 HAS_SHAREDCTYPES = True
43except ImportError:
44 HAS_SHAREDCTYPES = False
45
Antoine Pitroubcb39d42011-08-23 19:46:22 +020046try:
47 import msvcrt
48except ImportError:
49 msvcrt = None
50
Benjamin Petersone711caf2008-06-11 16:44:04 +000051#
52#
53#
54
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000055def latin(s):
56 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000057
Benjamin Petersone711caf2008-06-11 16:44:04 +000058#
59# Constants
60#
61
62LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000063#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000064
65DELTA = 0.1
66CHECK_TIMINGS = False # making true makes tests take a lot longer
67 # and can sometimes cause some non-serious
68 # failures because some calls block a bit
69 # longer than expected
70if CHECK_TIMINGS:
71 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
72else:
73 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
74
75HAVE_GETVALUE = not getattr(_multiprocessing,
76 'HAVE_BROKEN_SEM_GETVALUE', False)
77
Jesse Noller6214edd2009-01-19 16:23:53 +000078WIN32 = (sys.platform == "win32")
79
Antoine Pitroubcb39d42011-08-23 19:46:22 +020080try:
81 MAXFD = os.sysconf("SC_OPEN_MAX")
82except:
83 MAXFD = 256
84
Benjamin Petersone711caf2008-06-11 16:44:04 +000085#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000086# Some tests require ctypes
87#
88
89try:
Florent Xiclunaaa171062010-08-14 15:56:42 +000090 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +000091except ImportError:
92 Structure = object
93 c_int = c_double = None
94
95#
Benjamin Petersone711caf2008-06-11 16:44:04 +000096# Creates a wrapper for a function which records the time it takes to finish
97#
98
99class TimingWrapper(object):
100
101 def __init__(self, func):
102 self.func = func
103 self.elapsed = None
104
105 def __call__(self, *args, **kwds):
106 t = time.time()
107 try:
108 return self.func(*args, **kwds)
109 finally:
110 self.elapsed = time.time() - t
111
112#
113# Base class for test cases
114#
115
116class BaseTestCase(object):
117
118 ALLOWED_TYPES = ('processes', 'manager', 'threads')
119
120 def assertTimingAlmostEqual(self, a, b):
121 if CHECK_TIMINGS:
122 self.assertAlmostEqual(a, b, 1)
123
124 def assertReturnsIfImplemented(self, value, func, *args):
125 try:
126 res = func(*args)
127 except NotImplementedError:
128 pass
129 else:
130 return self.assertEqual(value, res)
131
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000132 # For the sanity of Windows users, rather than crashing or freezing in
133 # multiple ways.
134 def __reduce__(self, *args):
135 raise NotImplementedError("shouldn't try to pickle a test case")
136
137 __reduce_ex__ = __reduce__
138
Benjamin Petersone711caf2008-06-11 16:44:04 +0000139#
140# Return the value of a semaphore
141#
142
143def get_value(self):
144 try:
145 return self.get_value()
146 except AttributeError:
147 try:
148 return self._Semaphore__value
149 except AttributeError:
150 try:
151 return self._value
152 except AttributeError:
153 raise NotImplementedError
154
155#
156# Testcases
157#
158
159class _TestProcess(BaseTestCase):
160
161 ALLOWED_TYPES = ('processes', 'threads')
162
163 def test_current(self):
164 if self.TYPE == 'threads':
165 return
166
167 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000168 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000169
170 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000171 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000172 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000173 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000174 self.assertEqual(current.ident, os.getpid())
175 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000176
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000177 @classmethod
178 def _test(cls, q, *args, **kwds):
179 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000180 q.put(args)
181 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000182 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000183 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000184 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000185 q.put(current.pid)
186
187 def test_process(self):
188 q = self.Queue(1)
189 e = self.Event()
190 args = (q, 1, 2)
191 kwargs = {'hello':23, 'bye':2.54}
192 name = 'SomeProcess'
193 p = self.Process(
194 target=self._test, args=args, kwargs=kwargs, name=name
195 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000196 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000197 current = self.current_process()
198
199 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000200 self.assertEqual(p.authkey, current.authkey)
201 self.assertEqual(p.is_alive(), False)
202 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000203 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000204 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000205 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206
207 p.start()
208
Ezio Melottib3aedd42010-11-20 19:04:17 +0000209 self.assertEqual(p.exitcode, None)
210 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000211 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000212
Ezio Melottib3aedd42010-11-20 19:04:17 +0000213 self.assertEqual(q.get(), args[1:])
214 self.assertEqual(q.get(), kwargs)
215 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000216 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000217 self.assertEqual(q.get(), current.authkey)
218 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000219
220 p.join()
221
Ezio Melottib3aedd42010-11-20 19:04:17 +0000222 self.assertEqual(p.exitcode, 0)
223 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000224 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000226 @classmethod
227 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000228 time.sleep(1000)
229
230 def test_terminate(self):
231 if self.TYPE == 'threads':
232 return
233
234 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000235 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000236 p.start()
237
238 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000239 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000240 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000241
242 p.terminate()
243
244 join = TimingWrapper(p.join)
245 self.assertEqual(join(), None)
246 self.assertTimingAlmostEqual(join.elapsed, 0.0)
247
248 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000249 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000250
251 p.join()
252
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000253 # XXX sometimes get p.exitcode == 0 on Windows ...
254 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000255
256 def test_cpu_count(self):
257 try:
258 cpus = multiprocessing.cpu_count()
259 except NotImplementedError:
260 cpus = 1
261 self.assertTrue(type(cpus) is int)
262 self.assertTrue(cpus >= 1)
263
264 def test_active_children(self):
265 self.assertEqual(type(self.active_children()), list)
266
267 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000268 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000269
Jesus Cea94f964f2011-09-09 20:26:57 +0200270 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000271 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000272 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000273
274 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000275 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000276
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000277 @classmethod
278 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000279 from multiprocessing import forking
280 wconn.send(id)
281 if len(id) < 2:
282 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000283 p = cls.Process(
284 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000285 )
286 p.start()
287 p.join()
288
289 def test_recursion(self):
290 rconn, wconn = self.Pipe(duplex=False)
291 self._test_recursion(wconn, [])
292
293 time.sleep(DELTA)
294 result = []
295 while rconn.poll():
296 result.append(rconn.recv())
297
298 expected = [
299 [],
300 [0],
301 [0, 0],
302 [0, 1],
303 [1],
304 [1, 0],
305 [1, 1]
306 ]
307 self.assertEqual(result, expected)
308
309#
310#
311#
312
313class _UpperCaser(multiprocessing.Process):
314
315 def __init__(self):
316 multiprocessing.Process.__init__(self)
317 self.child_conn, self.parent_conn = multiprocessing.Pipe()
318
319 def run(self):
320 self.parent_conn.close()
321 for s in iter(self.child_conn.recv, None):
322 self.child_conn.send(s.upper())
323 self.child_conn.close()
324
325 def submit(self, s):
326 assert type(s) is str
327 self.parent_conn.send(s)
328 return self.parent_conn.recv()
329
330 def stop(self):
331 self.parent_conn.send(None)
332 self.parent_conn.close()
333 self.child_conn.close()
334
335class _TestSubclassingProcess(BaseTestCase):
336
337 ALLOWED_TYPES = ('processes',)
338
339 def test_subclassing(self):
340 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200341 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000342 uppercaser.start()
343 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
344 self.assertEqual(uppercaser.submit('world'), 'WORLD')
345 uppercaser.stop()
346 uppercaser.join()
347
348#
349#
350#
351
352def queue_empty(q):
353 if hasattr(q, 'empty'):
354 return q.empty()
355 else:
356 return q.qsize() == 0
357
358def queue_full(q, maxsize):
359 if hasattr(q, 'full'):
360 return q.full()
361 else:
362 return q.qsize() == maxsize
363
364
365class _TestQueue(BaseTestCase):
366
367
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000368 @classmethod
369 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000370 child_can_start.wait()
371 for i in range(6):
372 queue.get()
373 parent_can_continue.set()
374
375 def test_put(self):
376 MAXSIZE = 6
377 queue = self.Queue(maxsize=MAXSIZE)
378 child_can_start = self.Event()
379 parent_can_continue = self.Event()
380
381 proc = self.Process(
382 target=self._test_put,
383 args=(queue, child_can_start, parent_can_continue)
384 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000385 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000386 proc.start()
387
388 self.assertEqual(queue_empty(queue), True)
389 self.assertEqual(queue_full(queue, MAXSIZE), False)
390
391 queue.put(1)
392 queue.put(2, True)
393 queue.put(3, True, None)
394 queue.put(4, False)
395 queue.put(5, False, None)
396 queue.put_nowait(6)
397
398 # the values may be in buffer but not yet in pipe so sleep a bit
399 time.sleep(DELTA)
400
401 self.assertEqual(queue_empty(queue), False)
402 self.assertEqual(queue_full(queue, MAXSIZE), True)
403
404 put = TimingWrapper(queue.put)
405 put_nowait = TimingWrapper(queue.put_nowait)
406
407 self.assertRaises(pyqueue.Full, put, 7, False)
408 self.assertTimingAlmostEqual(put.elapsed, 0)
409
410 self.assertRaises(pyqueue.Full, put, 7, False, None)
411 self.assertTimingAlmostEqual(put.elapsed, 0)
412
413 self.assertRaises(pyqueue.Full, put_nowait, 7)
414 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
415
416 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
417 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
418
419 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
420 self.assertTimingAlmostEqual(put.elapsed, 0)
421
422 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
423 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
424
425 child_can_start.set()
426 parent_can_continue.wait()
427
428 self.assertEqual(queue_empty(queue), True)
429 self.assertEqual(queue_full(queue, MAXSIZE), False)
430
431 proc.join()
432
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000433 @classmethod
434 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000435 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000436 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000437 queue.put(2)
438 queue.put(3)
439 queue.put(4)
440 queue.put(5)
441 parent_can_continue.set()
442
443 def test_get(self):
444 queue = self.Queue()
445 child_can_start = self.Event()
446 parent_can_continue = self.Event()
447
448 proc = self.Process(
449 target=self._test_get,
450 args=(queue, child_can_start, parent_can_continue)
451 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000452 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000453 proc.start()
454
455 self.assertEqual(queue_empty(queue), True)
456
457 child_can_start.set()
458 parent_can_continue.wait()
459
460 time.sleep(DELTA)
461 self.assertEqual(queue_empty(queue), False)
462
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000463 # Hangs unexpectedly, remove for now
464 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000465 self.assertEqual(queue.get(True, None), 2)
466 self.assertEqual(queue.get(True), 3)
467 self.assertEqual(queue.get(timeout=1), 4)
468 self.assertEqual(queue.get_nowait(), 5)
469
470 self.assertEqual(queue_empty(queue), True)
471
472 get = TimingWrapper(queue.get)
473 get_nowait = TimingWrapper(queue.get_nowait)
474
475 self.assertRaises(pyqueue.Empty, get, False)
476 self.assertTimingAlmostEqual(get.elapsed, 0)
477
478 self.assertRaises(pyqueue.Empty, get, False, None)
479 self.assertTimingAlmostEqual(get.elapsed, 0)
480
481 self.assertRaises(pyqueue.Empty, get_nowait)
482 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
483
484 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
485 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
486
487 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
488 self.assertTimingAlmostEqual(get.elapsed, 0)
489
490 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
491 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
492
493 proc.join()
494
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000495 @classmethod
496 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000497 for i in range(10, 20):
498 queue.put(i)
499 # note that at this point the items may only be buffered, so the
500 # process cannot shutdown until the feeder thread has finished
501 # pushing items onto the pipe.
502
503 def test_fork(self):
504 # Old versions of Queue would fail to create a new feeder
505 # thread for a forked process if the original process had its
506 # own feeder thread. This test checks that this no longer
507 # happens.
508
509 queue = self.Queue()
510
511 # put items on queue so that main process starts a feeder thread
512 for i in range(10):
513 queue.put(i)
514
515 # wait to make sure thread starts before we fork a new process
516 time.sleep(DELTA)
517
518 # fork process
519 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200520 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000521 p.start()
522
523 # check that all expected items are in the queue
524 for i in range(20):
525 self.assertEqual(queue.get(), i)
526 self.assertRaises(pyqueue.Empty, queue.get, False)
527
528 p.join()
529
530 def test_qsize(self):
531 q = self.Queue()
532 try:
533 self.assertEqual(q.qsize(), 0)
534 except NotImplementedError:
535 return
536 q.put(1)
537 self.assertEqual(q.qsize(), 1)
538 q.put(5)
539 self.assertEqual(q.qsize(), 2)
540 q.get()
541 self.assertEqual(q.qsize(), 1)
542 q.get()
543 self.assertEqual(q.qsize(), 0)
544
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000545 @classmethod
546 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000547 for obj in iter(q.get, None):
548 time.sleep(DELTA)
549 q.task_done()
550
551 def test_task_done(self):
552 queue = self.JoinableQueue()
553
554 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000555 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000556
557 workers = [self.Process(target=self._test_task_done, args=(queue,))
558 for i in range(4)]
559
560 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200561 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000562 p.start()
563
564 for i in range(10):
565 queue.put(i)
566
567 queue.join()
568
569 for p in workers:
570 queue.put(None)
571
572 for p in workers:
573 p.join()
574
575#
576#
577#
578
579class _TestLock(BaseTestCase):
580
581 def test_lock(self):
582 lock = self.Lock()
583 self.assertEqual(lock.acquire(), True)
584 self.assertEqual(lock.acquire(False), False)
585 self.assertEqual(lock.release(), None)
586 self.assertRaises((ValueError, threading.ThreadError), lock.release)
587
588 def test_rlock(self):
589 lock = self.RLock()
590 self.assertEqual(lock.acquire(), True)
591 self.assertEqual(lock.acquire(), True)
592 self.assertEqual(lock.acquire(), True)
593 self.assertEqual(lock.release(), None)
594 self.assertEqual(lock.release(), None)
595 self.assertEqual(lock.release(), None)
596 self.assertRaises((AssertionError, RuntimeError), lock.release)
597
Jesse Nollerf8d00852009-03-31 03:25:07 +0000598 def test_lock_context(self):
599 with self.Lock():
600 pass
601
Benjamin Petersone711caf2008-06-11 16:44:04 +0000602
603class _TestSemaphore(BaseTestCase):
604
605 def _test_semaphore(self, sem):
606 self.assertReturnsIfImplemented(2, get_value, sem)
607 self.assertEqual(sem.acquire(), True)
608 self.assertReturnsIfImplemented(1, get_value, sem)
609 self.assertEqual(sem.acquire(), True)
610 self.assertReturnsIfImplemented(0, get_value, sem)
611 self.assertEqual(sem.acquire(False), False)
612 self.assertReturnsIfImplemented(0, get_value, sem)
613 self.assertEqual(sem.release(), None)
614 self.assertReturnsIfImplemented(1, get_value, sem)
615 self.assertEqual(sem.release(), None)
616 self.assertReturnsIfImplemented(2, get_value, sem)
617
618 def test_semaphore(self):
619 sem = self.Semaphore(2)
620 self._test_semaphore(sem)
621 self.assertEqual(sem.release(), None)
622 self.assertReturnsIfImplemented(3, get_value, sem)
623 self.assertEqual(sem.release(), None)
624 self.assertReturnsIfImplemented(4, get_value, sem)
625
626 def test_bounded_semaphore(self):
627 sem = self.BoundedSemaphore(2)
628 self._test_semaphore(sem)
629 # Currently fails on OS/X
630 #if HAVE_GETVALUE:
631 # self.assertRaises(ValueError, sem.release)
632 # self.assertReturnsIfImplemented(2, get_value, sem)
633
634 def test_timeout(self):
635 if self.TYPE != 'processes':
636 return
637
638 sem = self.Semaphore(0)
639 acquire = TimingWrapper(sem.acquire)
640
641 self.assertEqual(acquire(False), False)
642 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
643
644 self.assertEqual(acquire(False, None), False)
645 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
646
647 self.assertEqual(acquire(False, TIMEOUT1), False)
648 self.assertTimingAlmostEqual(acquire.elapsed, 0)
649
650 self.assertEqual(acquire(True, TIMEOUT2), False)
651 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
652
653 self.assertEqual(acquire(timeout=TIMEOUT3), False)
654 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
655
656
657class _TestCondition(BaseTestCase):
658
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000659 @classmethod
660 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000661 cond.acquire()
662 sleeping.release()
663 cond.wait(timeout)
664 woken.release()
665 cond.release()
666
667 def check_invariant(self, cond):
668 # this is only supposed to succeed when there are no sleepers
669 if self.TYPE == 'processes':
670 try:
671 sleepers = (cond._sleeping_count.get_value() -
672 cond._woken_count.get_value())
673 self.assertEqual(sleepers, 0)
674 self.assertEqual(cond._wait_semaphore.get_value(), 0)
675 except NotImplementedError:
676 pass
677
678 def test_notify(self):
679 cond = self.Condition()
680 sleeping = self.Semaphore(0)
681 woken = self.Semaphore(0)
682
683 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000684 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000685 p.start()
686
687 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000688 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000689 p.start()
690
691 # wait for both children to start sleeping
692 sleeping.acquire()
693 sleeping.acquire()
694
695 # check no process/thread has woken up
696 time.sleep(DELTA)
697 self.assertReturnsIfImplemented(0, get_value, woken)
698
699 # wake up one process/thread
700 cond.acquire()
701 cond.notify()
702 cond.release()
703
704 # check one process/thread has woken up
705 time.sleep(DELTA)
706 self.assertReturnsIfImplemented(1, get_value, woken)
707
708 # wake up another
709 cond.acquire()
710 cond.notify()
711 cond.release()
712
713 # check other has woken up
714 time.sleep(DELTA)
715 self.assertReturnsIfImplemented(2, get_value, woken)
716
717 # check state is not mucked up
718 self.check_invariant(cond)
719 p.join()
720
721 def test_notify_all(self):
722 cond = self.Condition()
723 sleeping = self.Semaphore(0)
724 woken = self.Semaphore(0)
725
726 # start some threads/processes which will timeout
727 for i in range(3):
728 p = self.Process(target=self.f,
729 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000730 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000731 p.start()
732
733 t = threading.Thread(target=self.f,
734 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000735 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000736 t.start()
737
738 # wait for them all to sleep
739 for i in range(6):
740 sleeping.acquire()
741
742 # check they have all timed out
743 for i in range(6):
744 woken.acquire()
745 self.assertReturnsIfImplemented(0, get_value, woken)
746
747 # check state is not mucked up
748 self.check_invariant(cond)
749
750 # start some more threads/processes
751 for i in range(3):
752 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000753 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000754 p.start()
755
756 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000757 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000758 t.start()
759
760 # wait for them to all sleep
761 for i in range(6):
762 sleeping.acquire()
763
764 # check no process/thread has woken up
765 time.sleep(DELTA)
766 self.assertReturnsIfImplemented(0, get_value, woken)
767
768 # wake them all up
769 cond.acquire()
770 cond.notify_all()
771 cond.release()
772
773 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200774 for i in range(10):
775 try:
776 if get_value(woken) == 6:
777 break
778 except NotImplementedError:
779 break
780 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000781 self.assertReturnsIfImplemented(6, get_value, woken)
782
783 # check state is not mucked up
784 self.check_invariant(cond)
785
786 def test_timeout(self):
787 cond = self.Condition()
788 wait = TimingWrapper(cond.wait)
789 cond.acquire()
790 res = wait(TIMEOUT1)
791 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000792 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000793 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
794
795
796class _TestEvent(BaseTestCase):
797
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000798 @classmethod
799 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000800 time.sleep(TIMEOUT2)
801 event.set()
802
803 def test_event(self):
804 event = self.Event()
805 wait = TimingWrapper(event.wait)
806
Ezio Melotti13925002011-03-16 11:05:33 +0200807 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000808 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000809 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000810
Benjamin Peterson965ce872009-04-05 21:24:58 +0000811 # Removed, threading.Event.wait() will return the value of the __flag
812 # instead of None. API Shear with the semaphore backed mp.Event
813 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000814 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000815 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000816 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
817
818 event.set()
819
820 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000821 self.assertEqual(event.is_set(), True)
822 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000823 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000824 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000825 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
826 # self.assertEqual(event.is_set(), True)
827
828 event.clear()
829
830 #self.assertEqual(event.is_set(), False)
831
Jesus Cea94f964f2011-09-09 20:26:57 +0200832 p = self.Process(target=self._test_event, args=(event,))
833 p.daemon = True
834 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000835 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000836
837#
838#
839#
840
841class _TestValue(BaseTestCase):
842
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000843 ALLOWED_TYPES = ('processes',)
844
Benjamin Petersone711caf2008-06-11 16:44:04 +0000845 codes_values = [
846 ('i', 4343, 24234),
847 ('d', 3.625, -4.25),
848 ('h', -232, 234),
849 ('c', latin('x'), latin('y'))
850 ]
851
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000852 def setUp(self):
853 if not HAS_SHAREDCTYPES:
854 self.skipTest("requires multiprocessing.sharedctypes")
855
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000856 @classmethod
857 def _test(cls, values):
858 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000859 sv.value = cv[2]
860
861
862 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000863 if raw:
864 values = [self.RawValue(code, value)
865 for code, value, _ in self.codes_values]
866 else:
867 values = [self.Value(code, value)
868 for code, value, _ in self.codes_values]
869
870 for sv, cv in zip(values, self.codes_values):
871 self.assertEqual(sv.value, cv[1])
872
873 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200874 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000875 proc.start()
876 proc.join()
877
878 for sv, cv in zip(values, self.codes_values):
879 self.assertEqual(sv.value, cv[2])
880
881 def test_rawvalue(self):
882 self.test_value(raw=True)
883
884 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000885 val1 = self.Value('i', 5)
886 lock1 = val1.get_lock()
887 obj1 = val1.get_obj()
888
889 val2 = self.Value('i', 5, lock=None)
890 lock2 = val2.get_lock()
891 obj2 = val2.get_obj()
892
893 lock = self.Lock()
894 val3 = self.Value('i', 5, lock=lock)
895 lock3 = val3.get_lock()
896 obj3 = val3.get_obj()
897 self.assertEqual(lock, lock3)
898
Jesse Nollerb0516a62009-01-18 03:11:38 +0000899 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000900 self.assertFalse(hasattr(arr4, 'get_lock'))
901 self.assertFalse(hasattr(arr4, 'get_obj'))
902
Jesse Nollerb0516a62009-01-18 03:11:38 +0000903 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
904
905 arr5 = self.RawValue('i', 5)
906 self.assertFalse(hasattr(arr5, 'get_lock'))
907 self.assertFalse(hasattr(arr5, 'get_obj'))
908
Benjamin Petersone711caf2008-06-11 16:44:04 +0000909
910class _TestArray(BaseTestCase):
911
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000912 ALLOWED_TYPES = ('processes',)
913
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000914 @classmethod
915 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000916 for i in range(1, len(seq)):
917 seq[i] += seq[i-1]
918
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000919 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000920 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000921 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
922 if raw:
923 arr = self.RawArray('i', seq)
924 else:
925 arr = self.Array('i', seq)
926
927 self.assertEqual(len(arr), len(seq))
928 self.assertEqual(arr[3], seq[3])
929 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
930
931 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
932
933 self.assertEqual(list(arr[:]), seq)
934
935 self.f(seq)
936
937 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200938 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000939 p.start()
940 p.join()
941
942 self.assertEqual(list(arr[:]), seq)
943
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000944 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +0000945 def test_array_from_size(self):
946 size = 10
947 # Test for zeroing (see issue #11675).
948 # The repetition below strengthens the test by increasing the chances
949 # of previously allocated non-zero memory being used for the new array
950 # on the 2nd and 3rd loops.
951 for _ in range(3):
952 arr = self.Array('i', size)
953 self.assertEqual(len(arr), size)
954 self.assertEqual(list(arr), [0] * size)
955 arr[:] = range(10)
956 self.assertEqual(list(arr), list(range(10)))
957 del arr
958
959 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000960 def test_rawarray(self):
961 self.test_array(raw=True)
962
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000963 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000964 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000965 arr1 = self.Array('i', list(range(10)))
966 lock1 = arr1.get_lock()
967 obj1 = arr1.get_obj()
968
969 arr2 = self.Array('i', list(range(10)), lock=None)
970 lock2 = arr2.get_lock()
971 obj2 = arr2.get_obj()
972
973 lock = self.Lock()
974 arr3 = self.Array('i', list(range(10)), lock=lock)
975 lock3 = arr3.get_lock()
976 obj3 = arr3.get_obj()
977 self.assertEqual(lock, lock3)
978
Jesse Nollerb0516a62009-01-18 03:11:38 +0000979 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000980 self.assertFalse(hasattr(arr4, 'get_lock'))
981 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000982 self.assertRaises(AttributeError,
983 self.Array, 'i', range(10), lock='notalock')
984
985 arr5 = self.RawArray('i', range(10))
986 self.assertFalse(hasattr(arr5, 'get_lock'))
987 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000988
989#
990#
991#
992
993class _TestContainers(BaseTestCase):
994
995 ALLOWED_TYPES = ('manager',)
996
997 def test_list(self):
998 a = self.list(list(range(10)))
999 self.assertEqual(a[:], list(range(10)))
1000
1001 b = self.list()
1002 self.assertEqual(b[:], [])
1003
1004 b.extend(list(range(5)))
1005 self.assertEqual(b[:], list(range(5)))
1006
1007 self.assertEqual(b[2], 2)
1008 self.assertEqual(b[2:10], [2,3,4])
1009
1010 b *= 2
1011 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1012
1013 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1014
1015 self.assertEqual(a[:], list(range(10)))
1016
1017 d = [a, b]
1018 e = self.list(d)
1019 self.assertEqual(
1020 e[:],
1021 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1022 )
1023
1024 f = self.list([a])
1025 a.append('hello')
1026 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1027
1028 def test_dict(self):
1029 d = self.dict()
1030 indices = list(range(65, 70))
1031 for i in indices:
1032 d[i] = chr(i)
1033 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1034 self.assertEqual(sorted(d.keys()), indices)
1035 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1036 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1037
1038 def test_namespace(self):
1039 n = self.Namespace()
1040 n.name = 'Bob'
1041 n.job = 'Builder'
1042 n._hidden = 'hidden'
1043 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1044 del n.job
1045 self.assertEqual(str(n), "Namespace(name='Bob')")
1046 self.assertTrue(hasattr(n, 'name'))
1047 self.assertTrue(not hasattr(n, 'job'))
1048
1049#
1050#
1051#
1052
1053def sqr(x, wait=0.0):
1054 time.sleep(wait)
1055 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001056
Benjamin Petersone711caf2008-06-11 16:44:04 +00001057class _TestPool(BaseTestCase):
1058
1059 def test_apply(self):
1060 papply = self.pool.apply
1061 self.assertEqual(papply(sqr, (5,)), sqr(5))
1062 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1063
1064 def test_map(self):
1065 pmap = self.pool.map
1066 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1067 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1068 list(map(sqr, list(range(100)))))
1069
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001070 def test_map_chunksize(self):
1071 try:
1072 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1073 except multiprocessing.TimeoutError:
1074 self.fail("pool.map_async with chunksize stalled on null list")
1075
Benjamin Petersone711caf2008-06-11 16:44:04 +00001076 def test_async(self):
1077 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1078 get = TimingWrapper(res.get)
1079 self.assertEqual(get(), 49)
1080 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1081
1082 def test_async_timeout(self):
1083 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1084 get = TimingWrapper(res.get)
1085 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1086 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1087
1088 def test_imap(self):
1089 it = self.pool.imap(sqr, list(range(10)))
1090 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1091
1092 it = self.pool.imap(sqr, list(range(10)))
1093 for i in range(10):
1094 self.assertEqual(next(it), i*i)
1095 self.assertRaises(StopIteration, it.__next__)
1096
1097 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1098 for i in range(1000):
1099 self.assertEqual(next(it), i*i)
1100 self.assertRaises(StopIteration, it.__next__)
1101
1102 def test_imap_unordered(self):
1103 it = self.pool.imap_unordered(sqr, list(range(1000)))
1104 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1105
1106 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1107 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1108
1109 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001110 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1111 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1112
Benjamin Petersone711caf2008-06-11 16:44:04 +00001113 p = multiprocessing.Pool(3)
1114 self.assertEqual(3, len(p._pool))
1115 p.close()
1116 p.join()
1117
1118 def test_terminate(self):
1119 if self.TYPE == 'manager':
1120 # On Unix a forked process increfs each shared object to
1121 # which its parent process held a reference. If the
1122 # forked process gets terminated then there is likely to
1123 # be a reference leak. So to prevent
1124 # _TestZZZNumberOfObjects from failing we skip this test
1125 # when using a manager.
1126 return
1127
1128 result = self.pool.map_async(
1129 time.sleep, [0.1 for i in range(10000)], chunksize=1
1130 )
1131 self.pool.terminate()
1132 join = TimingWrapper(self.pool.join)
1133 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001134 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001135
Ask Solem2afcbf22010-11-09 20:55:52 +00001136def raising():
1137 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001138
Ask Solem2afcbf22010-11-09 20:55:52 +00001139def unpickleable_result():
1140 return lambda: 42
1141
1142class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001143 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001144
1145 def test_async_error_callback(self):
1146 p = multiprocessing.Pool(2)
1147
1148 scratchpad = [None]
1149 def errback(exc):
1150 scratchpad[0] = exc
1151
1152 res = p.apply_async(raising, error_callback=errback)
1153 self.assertRaises(KeyError, res.get)
1154 self.assertTrue(scratchpad[0])
1155 self.assertIsInstance(scratchpad[0], KeyError)
1156
1157 p.close()
1158 p.join()
1159
1160 def test_unpickleable_result(self):
1161 from multiprocessing.pool import MaybeEncodingError
1162 p = multiprocessing.Pool(2)
1163
1164 # Make sure we don't lose pool processes because of encoding errors.
1165 for iteration in range(20):
1166
1167 scratchpad = [None]
1168 def errback(exc):
1169 scratchpad[0] = exc
1170
1171 res = p.apply_async(unpickleable_result, error_callback=errback)
1172 self.assertRaises(MaybeEncodingError, res.get)
1173 wrapped = scratchpad[0]
1174 self.assertTrue(wrapped)
1175 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1176 self.assertIsNotNone(wrapped.exc)
1177 self.assertIsNotNone(wrapped.value)
1178
1179 p.close()
1180 p.join()
1181
1182class _TestPoolWorkerLifetime(BaseTestCase):
1183 ALLOWED_TYPES = ('processes', )
1184
Jesse Noller1f0b6582010-01-27 03:36:01 +00001185 def test_pool_worker_lifetime(self):
1186 p = multiprocessing.Pool(3, maxtasksperchild=10)
1187 self.assertEqual(3, len(p._pool))
1188 origworkerpids = [w.pid for w in p._pool]
1189 # Run many tasks so each worker gets replaced (hopefully)
1190 results = []
1191 for i in range(100):
1192 results.append(p.apply_async(sqr, (i, )))
1193 # Fetch the results and verify we got the right answers,
1194 # also ensuring all the tasks have completed.
1195 for (j, res) in enumerate(results):
1196 self.assertEqual(res.get(), sqr(j))
1197 # Refill the pool
1198 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001199 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001200 # (countdown * DELTA = 5 seconds max startup process time)
1201 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001202 while countdown and not all(w.is_alive() for w in p._pool):
1203 countdown -= 1
1204 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001205 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001206 # All pids should be assigned. See issue #7805.
1207 self.assertNotIn(None, origworkerpids)
1208 self.assertNotIn(None, finalworkerpids)
1209 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001210 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1211 p.close()
1212 p.join()
1213
Benjamin Petersone711caf2008-06-11 16:44:04 +00001214#
1215# Test that manager has expected number of shared objects left
1216#
1217
1218class _TestZZZNumberOfObjects(BaseTestCase):
1219 # Because test cases are sorted alphabetically, this one will get
1220 # run after all the other tests for the manager. It tests that
1221 # there have been no "reference leaks" for the manager's shared
1222 # objects. Note the comment in _TestPool.test_terminate().
1223 ALLOWED_TYPES = ('manager',)
1224
1225 def test_number_of_objects(self):
1226 EXPECTED_NUMBER = 1 # the pool object is still alive
1227 multiprocessing.active_children() # discard dead process objs
1228 gc.collect() # do garbage collection
1229 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001230 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001231 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001232 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001233 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001234
1235 self.assertEqual(refs, EXPECTED_NUMBER)
1236
1237#
1238# Test of creating a customized manager class
1239#
1240
1241from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1242
1243class FooBar(object):
1244 def f(self):
1245 return 'f()'
1246 def g(self):
1247 raise ValueError
1248 def _h(self):
1249 return '_h()'
1250
1251def baz():
1252 for i in range(10):
1253 yield i*i
1254
1255class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001256 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001257 def __iter__(self):
1258 return self
1259 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001260 return self._callmethod('__next__')
1261
1262class MyManager(BaseManager):
1263 pass
1264
1265MyManager.register('Foo', callable=FooBar)
1266MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1267MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1268
1269
1270class _TestMyManager(BaseTestCase):
1271
1272 ALLOWED_TYPES = ('manager',)
1273
1274 def test_mymanager(self):
1275 manager = MyManager()
1276 manager.start()
1277
1278 foo = manager.Foo()
1279 bar = manager.Bar()
1280 baz = manager.baz()
1281
1282 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1283 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1284
1285 self.assertEqual(foo_methods, ['f', 'g'])
1286 self.assertEqual(bar_methods, ['f', '_h'])
1287
1288 self.assertEqual(foo.f(), 'f()')
1289 self.assertRaises(ValueError, foo.g)
1290 self.assertEqual(foo._callmethod('f'), 'f()')
1291 self.assertRaises(RemoteError, foo._callmethod, '_h')
1292
1293 self.assertEqual(bar.f(), 'f()')
1294 self.assertEqual(bar._h(), '_h()')
1295 self.assertEqual(bar._callmethod('f'), 'f()')
1296 self.assertEqual(bar._callmethod('_h'), '_h()')
1297
1298 self.assertEqual(list(baz), [i*i for i in range(10)])
1299
1300 manager.shutdown()
1301
1302#
1303# Test of connecting to a remote server and using xmlrpclib for serialization
1304#
1305
1306_queue = pyqueue.Queue()
1307def get_queue():
1308 return _queue
1309
1310class QueueManager(BaseManager):
1311 '''manager class used by server process'''
1312QueueManager.register('get_queue', callable=get_queue)
1313
1314class QueueManager2(BaseManager):
1315 '''manager class which specifies the same interface as QueueManager'''
1316QueueManager2.register('get_queue')
1317
1318
1319SERIALIZER = 'xmlrpclib'
1320
1321class _TestRemoteManager(BaseTestCase):
1322
1323 ALLOWED_TYPES = ('manager',)
1324
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001325 @classmethod
1326 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001327 manager = QueueManager2(
1328 address=address, authkey=authkey, serializer=SERIALIZER
1329 )
1330 manager.connect()
1331 queue = manager.get_queue()
1332 queue.put(('hello world', None, True, 2.25))
1333
1334 def test_remote(self):
1335 authkey = os.urandom(32)
1336
1337 manager = QueueManager(
1338 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1339 )
1340 manager.start()
1341
1342 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001343 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001344 p.start()
1345
1346 manager2 = QueueManager2(
1347 address=manager.address, authkey=authkey, serializer=SERIALIZER
1348 )
1349 manager2.connect()
1350 queue = manager2.get_queue()
1351
1352 # Note that xmlrpclib will deserialize object as a list not a tuple
1353 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1354
1355 # Because we are using xmlrpclib for serialization instead of
1356 # pickle this will cause a serialization error.
1357 self.assertRaises(Exception, queue.put, time.sleep)
1358
1359 # Make queue finalizer run before the server is stopped
1360 del queue
1361 manager.shutdown()
1362
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001363class _TestManagerRestart(BaseTestCase):
1364
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001365 @classmethod
1366 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001367 manager = QueueManager(
1368 address=address, authkey=authkey, serializer=SERIALIZER)
1369 manager.connect()
1370 queue = manager.get_queue()
1371 queue.put('hello world')
1372
1373 def test_rapid_restart(self):
1374 authkey = os.urandom(32)
1375 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001376 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001377 srvr = manager.get_server()
1378 addr = srvr.address
1379 # Close the connection.Listener socket which gets opened as a part
1380 # of manager.get_server(). It's not needed for the test.
1381 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001382 manager.start()
1383
1384 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001385 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001386 p.start()
1387 queue = manager.get_queue()
1388 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001389 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001390 manager.shutdown()
1391 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001392 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001393 try:
1394 manager.start()
1395 except IOError as e:
1396 if e.errno != errno.EADDRINUSE:
1397 raise
1398 # Retry after some time, in case the old socket was lingering
1399 # (sporadic failure on buildbots)
1400 time.sleep(1.0)
1401 manager = QueueManager(
1402 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001403 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001404
Benjamin Petersone711caf2008-06-11 16:44:04 +00001405#
1406#
1407#
1408
1409SENTINEL = latin('')
1410
1411class _TestConnection(BaseTestCase):
1412
1413 ALLOWED_TYPES = ('processes', 'threads')
1414
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001415 @classmethod
1416 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001417 for msg in iter(conn.recv_bytes, SENTINEL):
1418 conn.send_bytes(msg)
1419 conn.close()
1420
1421 def test_connection(self):
1422 conn, child_conn = self.Pipe()
1423
1424 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001425 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001426 p.start()
1427
1428 seq = [1, 2.25, None]
1429 msg = latin('hello world')
1430 longmsg = msg * 10
1431 arr = array.array('i', list(range(4)))
1432
1433 if self.TYPE == 'processes':
1434 self.assertEqual(type(conn.fileno()), int)
1435
1436 self.assertEqual(conn.send(seq), None)
1437 self.assertEqual(conn.recv(), seq)
1438
1439 self.assertEqual(conn.send_bytes(msg), None)
1440 self.assertEqual(conn.recv_bytes(), msg)
1441
1442 if self.TYPE == 'processes':
1443 buffer = array.array('i', [0]*10)
1444 expected = list(arr) + [0] * (10 - len(arr))
1445 self.assertEqual(conn.send_bytes(arr), None)
1446 self.assertEqual(conn.recv_bytes_into(buffer),
1447 len(arr) * buffer.itemsize)
1448 self.assertEqual(list(buffer), expected)
1449
1450 buffer = array.array('i', [0]*10)
1451 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1452 self.assertEqual(conn.send_bytes(arr), None)
1453 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1454 len(arr) * buffer.itemsize)
1455 self.assertEqual(list(buffer), expected)
1456
1457 buffer = bytearray(latin(' ' * 40))
1458 self.assertEqual(conn.send_bytes(longmsg), None)
1459 try:
1460 res = conn.recv_bytes_into(buffer)
1461 except multiprocessing.BufferTooShort as e:
1462 self.assertEqual(e.args, (longmsg,))
1463 else:
1464 self.fail('expected BufferTooShort, got %s' % res)
1465
1466 poll = TimingWrapper(conn.poll)
1467
1468 self.assertEqual(poll(), False)
1469 self.assertTimingAlmostEqual(poll.elapsed, 0)
1470
1471 self.assertEqual(poll(TIMEOUT1), False)
1472 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1473
1474 conn.send(None)
1475
1476 self.assertEqual(poll(TIMEOUT1), True)
1477 self.assertTimingAlmostEqual(poll.elapsed, 0)
1478
1479 self.assertEqual(conn.recv(), None)
1480
1481 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1482 conn.send_bytes(really_big_msg)
1483 self.assertEqual(conn.recv_bytes(), really_big_msg)
1484
1485 conn.send_bytes(SENTINEL) # tell child to quit
1486 child_conn.close()
1487
1488 if self.TYPE == 'processes':
1489 self.assertEqual(conn.readable, True)
1490 self.assertEqual(conn.writable, True)
1491 self.assertRaises(EOFError, conn.recv)
1492 self.assertRaises(EOFError, conn.recv_bytes)
1493
1494 p.join()
1495
1496 def test_duplex_false(self):
1497 reader, writer = self.Pipe(duplex=False)
1498 self.assertEqual(writer.send(1), None)
1499 self.assertEqual(reader.recv(), 1)
1500 if self.TYPE == 'processes':
1501 self.assertEqual(reader.readable, True)
1502 self.assertEqual(reader.writable, False)
1503 self.assertEqual(writer.readable, False)
1504 self.assertEqual(writer.writable, True)
1505 self.assertRaises(IOError, reader.send, 2)
1506 self.assertRaises(IOError, writer.recv)
1507 self.assertRaises(IOError, writer.poll)
1508
1509 def test_spawn_close(self):
1510 # We test that a pipe connection can be closed by parent
1511 # process immediately after child is spawned. On Windows this
1512 # would have sometimes failed on old versions because
1513 # child_conn would be closed before the child got a chance to
1514 # duplicate it.
1515 conn, child_conn = self.Pipe()
1516
1517 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001518 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001519 p.start()
1520 child_conn.close() # this might complete before child initializes
1521
1522 msg = latin('hello')
1523 conn.send_bytes(msg)
1524 self.assertEqual(conn.recv_bytes(), msg)
1525
1526 conn.send_bytes(SENTINEL)
1527 conn.close()
1528 p.join()
1529
1530 def test_sendbytes(self):
1531 if self.TYPE != 'processes':
1532 return
1533
1534 msg = latin('abcdefghijklmnopqrstuvwxyz')
1535 a, b = self.Pipe()
1536
1537 a.send_bytes(msg)
1538 self.assertEqual(b.recv_bytes(), msg)
1539
1540 a.send_bytes(msg, 5)
1541 self.assertEqual(b.recv_bytes(), msg[5:])
1542
1543 a.send_bytes(msg, 7, 8)
1544 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1545
1546 a.send_bytes(msg, 26)
1547 self.assertEqual(b.recv_bytes(), latin(''))
1548
1549 a.send_bytes(msg, 26, 0)
1550 self.assertEqual(b.recv_bytes(), latin(''))
1551
1552 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1553
1554 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1555
1556 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1557
1558 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1559
1560 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1561
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001562 @classmethod
1563 def _is_fd_assigned(cls, fd):
1564 try:
1565 os.fstat(fd)
1566 except OSError as e:
1567 if e.errno == errno.EBADF:
1568 return False
1569 raise
1570 else:
1571 return True
1572
1573 @classmethod
1574 def _writefd(cls, conn, data, create_dummy_fds=False):
1575 if create_dummy_fds:
1576 for i in range(0, 256):
1577 if not cls._is_fd_assigned(i):
1578 os.dup2(conn.fileno(), i)
1579 fd = reduction.recv_handle(conn)
1580 if msvcrt:
1581 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1582 os.write(fd, data)
1583 os.close(fd)
1584
1585 def test_fd_transfer(self):
1586 if self.TYPE != 'processes':
1587 self.skipTest("only makes sense with processes")
1588 conn, child_conn = self.Pipe(duplex=True)
1589
1590 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02001591 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001592 p.start()
1593 with open(test.support.TESTFN, "wb") as f:
1594 fd = f.fileno()
1595 if msvcrt:
1596 fd = msvcrt.get_osfhandle(fd)
1597 reduction.send_handle(conn, fd, p.pid)
1598 p.join()
1599 with open(test.support.TESTFN, "rb") as f:
1600 self.assertEqual(f.read(), b"foo")
1601
1602 @unittest.skipIf(sys.platform == "win32",
1603 "test semantics don't make sense on Windows")
1604 @unittest.skipIf(MAXFD <= 256,
1605 "largest assignable fd number is too small")
1606 @unittest.skipUnless(hasattr(os, "dup2"),
1607 "test needs os.dup2()")
1608 def test_large_fd_transfer(self):
1609 # With fd > 256 (issue #11657)
1610 if self.TYPE != 'processes':
1611 self.skipTest("only makes sense with processes")
1612 conn, child_conn = self.Pipe(duplex=True)
1613
1614 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02001615 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001616 p.start()
1617 with open(test.support.TESTFN, "wb") as f:
1618 fd = f.fileno()
1619 for newfd in range(256, MAXFD):
1620 if not self._is_fd_assigned(newfd):
1621 break
1622 else:
1623 self.fail("could not find an unassigned large file descriptor")
1624 os.dup2(fd, newfd)
1625 try:
1626 reduction.send_handle(conn, newfd, p.pid)
1627 finally:
1628 os.close(newfd)
1629 p.join()
1630 with open(test.support.TESTFN, "rb") as f:
1631 self.assertEqual(f.read(), b"bar")
1632
1633
Benjamin Petersone711caf2008-06-11 16:44:04 +00001634class _TestListenerClient(BaseTestCase):
1635
1636 ALLOWED_TYPES = ('processes', 'threads')
1637
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001638 @classmethod
1639 def _test(cls, address):
1640 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001641 conn.send('hello')
1642 conn.close()
1643
1644 def test_listener_client(self):
1645 for family in self.connection.families:
1646 l = self.connection.Listener(family=family)
1647 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001648 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001649 p.start()
1650 conn = l.accept()
1651 self.assertEqual(conn.recv(), 'hello')
1652 p.join()
1653 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001654#
1655# Test of sending connection and socket objects between processes
1656#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001657"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001658class _TestPicklingConnections(BaseTestCase):
1659
1660 ALLOWED_TYPES = ('processes',)
1661
1662 def _listener(self, conn, families):
1663 for fam in families:
1664 l = self.connection.Listener(family=fam)
1665 conn.send(l.address)
1666 new_conn = l.accept()
1667 conn.send(new_conn)
1668
1669 if self.TYPE == 'processes':
1670 l = socket.socket()
1671 l.bind(('localhost', 0))
1672 conn.send(l.getsockname())
1673 l.listen(1)
1674 new_conn, addr = l.accept()
1675 conn.send(new_conn)
1676
1677 conn.recv()
1678
1679 def _remote(self, conn):
1680 for (address, msg) in iter(conn.recv, None):
1681 client = self.connection.Client(address)
1682 client.send(msg.upper())
1683 client.close()
1684
1685 if self.TYPE == 'processes':
1686 address, msg = conn.recv()
1687 client = socket.socket()
1688 client.connect(address)
1689 client.sendall(msg.upper())
1690 client.close()
1691
1692 conn.close()
1693
1694 def test_pickling(self):
1695 try:
1696 multiprocessing.allow_connection_pickling()
1697 except ImportError:
1698 return
1699
1700 families = self.connection.families
1701
1702 lconn, lconn0 = self.Pipe()
1703 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02001704 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001705 lp.start()
1706 lconn0.close()
1707
1708 rconn, rconn0 = self.Pipe()
1709 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001710 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001711 rp.start()
1712 rconn0.close()
1713
1714 for fam in families:
1715 msg = ('This connection uses family %s' % fam).encode('ascii')
1716 address = lconn.recv()
1717 rconn.send((address, msg))
1718 new_conn = lconn.recv()
1719 self.assertEqual(new_conn.recv(), msg.upper())
1720
1721 rconn.send(None)
1722
1723 if self.TYPE == 'processes':
1724 msg = latin('This connection uses a normal socket')
1725 address = lconn.recv()
1726 rconn.send((address, msg))
1727 if hasattr(socket, 'fromfd'):
1728 new_conn = lconn.recv()
1729 self.assertEqual(new_conn.recv(100), msg.upper())
1730 else:
1731 # XXX On Windows with Py2.6 need to backport fromfd()
1732 discard = lconn.recv_bytes()
1733
1734 lconn.send(None)
1735
1736 rconn.close()
1737 lconn.close()
1738
1739 lp.join()
1740 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001741"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001742#
1743#
1744#
1745
1746class _TestHeap(BaseTestCase):
1747
1748 ALLOWED_TYPES = ('processes',)
1749
1750 def test_heap(self):
1751 iterations = 5000
1752 maxblocks = 50
1753 blocks = []
1754
1755 # create and destroy lots of blocks of different sizes
1756 for i in range(iterations):
1757 size = int(random.lognormvariate(0, 1) * 1000)
1758 b = multiprocessing.heap.BufferWrapper(size)
1759 blocks.append(b)
1760 if len(blocks) > maxblocks:
1761 i = random.randrange(maxblocks)
1762 del blocks[i]
1763
1764 # get the heap object
1765 heap = multiprocessing.heap.BufferWrapper._heap
1766
1767 # verify the state of the heap
1768 all = []
1769 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02001770 heap._lock.acquire()
1771 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001772 for L in list(heap._len_to_seq.values()):
1773 for arena, start, stop in L:
1774 all.append((heap._arenas.index(arena), start, stop,
1775 stop-start, 'free'))
1776 for arena, start, stop in heap._allocated_blocks:
1777 all.append((heap._arenas.index(arena), start, stop,
1778 stop-start, 'occupied'))
1779 occupied += (stop-start)
1780
1781 all.sort()
1782
1783 for i in range(len(all)-1):
1784 (arena, start, stop) = all[i][:3]
1785 (narena, nstart, nstop) = all[i+1][:3]
1786 self.assertTrue((arena != narena and nstart == 0) or
1787 (stop == nstart))
1788
Charles-François Natali778db492011-07-02 14:35:49 +02001789 def test_free_from_gc(self):
1790 # Check that freeing of blocks by the garbage collector doesn't deadlock
1791 # (issue #12352).
1792 # Make sure the GC is enabled, and set lower collection thresholds to
1793 # make collections more frequent (and increase the probability of
1794 # deadlock).
1795 if not gc.isenabled():
1796 gc.enable()
1797 self.addCleanup(gc.disable)
1798 thresholds = gc.get_threshold()
1799 self.addCleanup(gc.set_threshold, *thresholds)
1800 gc.set_threshold(10)
1801
1802 # perform numerous block allocations, with cyclic references to make
1803 # sure objects are collected asynchronously by the gc
1804 for i in range(5000):
1805 a = multiprocessing.heap.BufferWrapper(1)
1806 b = multiprocessing.heap.BufferWrapper(1)
1807 # circular references
1808 a.buddy = b
1809 b.buddy = a
1810
Benjamin Petersone711caf2008-06-11 16:44:04 +00001811#
1812#
1813#
1814
Benjamin Petersone711caf2008-06-11 16:44:04 +00001815class _Foo(Structure):
1816 _fields_ = [
1817 ('x', c_int),
1818 ('y', c_double)
1819 ]
1820
1821class _TestSharedCTypes(BaseTestCase):
1822
1823 ALLOWED_TYPES = ('processes',)
1824
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001825 def setUp(self):
1826 if not HAS_SHAREDCTYPES:
1827 self.skipTest("requires multiprocessing.sharedctypes")
1828
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001829 @classmethod
1830 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001831 x.value *= 2
1832 y.value *= 2
1833 foo.x *= 2
1834 foo.y *= 2
1835 string.value *= 2
1836 for i in range(len(arr)):
1837 arr[i] *= 2
1838
1839 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001840 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001841 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001842 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001843 arr = self.Array('d', list(range(10)), lock=lock)
1844 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001845 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001846
1847 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02001848 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001849 p.start()
1850 p.join()
1851
1852 self.assertEqual(x.value, 14)
1853 self.assertAlmostEqual(y.value, 2.0/3.0)
1854 self.assertEqual(foo.x, 6)
1855 self.assertAlmostEqual(foo.y, 4.0)
1856 for i in range(10):
1857 self.assertAlmostEqual(arr[i], i*2)
1858 self.assertEqual(string.value, latin('hellohello'))
1859
1860 def test_synchronize(self):
1861 self.test_sharedctypes(lock=True)
1862
1863 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001864 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001865 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001866 foo.x = 0
1867 foo.y = 0
1868 self.assertEqual(bar.x, 2)
1869 self.assertAlmostEqual(bar.y, 5.0)
1870
1871#
1872#
1873#
1874
1875class _TestFinalize(BaseTestCase):
1876
1877 ALLOWED_TYPES = ('processes',)
1878
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001879 @classmethod
1880 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001881 class Foo(object):
1882 pass
1883
1884 a = Foo()
1885 util.Finalize(a, conn.send, args=('a',))
1886 del a # triggers callback for a
1887
1888 b = Foo()
1889 close_b = util.Finalize(b, conn.send, args=('b',))
1890 close_b() # triggers callback for b
1891 close_b() # does nothing because callback has already been called
1892 del b # does nothing because callback has already been called
1893
1894 c = Foo()
1895 util.Finalize(c, conn.send, args=('c',))
1896
1897 d10 = Foo()
1898 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1899
1900 d01 = Foo()
1901 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1902 d02 = Foo()
1903 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1904 d03 = Foo()
1905 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1906
1907 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1908
1909 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1910
Ezio Melotti13925002011-03-16 11:05:33 +02001911 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00001912 # garbage collecting locals
1913 util._exit_function()
1914 conn.close()
1915 os._exit(0)
1916
1917 def test_finalize(self):
1918 conn, child_conn = self.Pipe()
1919
1920 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001921 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001922 p.start()
1923 p.join()
1924
1925 result = [obj for obj in iter(conn.recv, 'STOP')]
1926 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1927
1928#
1929# Test that from ... import * works for each module
1930#
1931
1932class _TestImportStar(BaseTestCase):
1933
1934 ALLOWED_TYPES = ('processes',)
1935
1936 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001937 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001938 'multiprocessing', 'multiprocessing.connection',
1939 'multiprocessing.heap', 'multiprocessing.managers',
1940 'multiprocessing.pool', 'multiprocessing.process',
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001941 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001942 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001943 ]
1944
1945 if c_int is not None:
1946 # This module requires _ctypes
1947 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001948
1949 for name in modules:
1950 __import__(name)
1951 mod = sys.modules[name]
1952
1953 for attr in getattr(mod, '__all__', ()):
1954 self.assertTrue(
1955 hasattr(mod, attr),
1956 '%r does not have attribute %r' % (mod, attr)
1957 )
1958
1959#
1960# Quick test that logging works -- does not test logging output
1961#
1962
1963class _TestLogging(BaseTestCase):
1964
1965 ALLOWED_TYPES = ('processes',)
1966
1967 def test_enable_logging(self):
1968 logger = multiprocessing.get_logger()
1969 logger.setLevel(util.SUBWARNING)
1970 self.assertTrue(logger is not None)
1971 logger.debug('this will not be printed')
1972 logger.info('nor will this')
1973 logger.setLevel(LOG_LEVEL)
1974
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001975 @classmethod
1976 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001977 logger = multiprocessing.get_logger()
1978 conn.send(logger.getEffectiveLevel())
1979
1980 def test_level(self):
1981 LEVEL1 = 32
1982 LEVEL2 = 37
1983
1984 logger = multiprocessing.get_logger()
1985 root_logger = logging.getLogger()
1986 root_level = root_logger.level
1987
1988 reader, writer = multiprocessing.Pipe(duplex=False)
1989
1990 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02001991 p = self.Process(target=self._test_level, args=(writer,))
1992 p.daemon = True
1993 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001994 self.assertEqual(LEVEL1, reader.recv())
1995
1996 logger.setLevel(logging.NOTSET)
1997 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02001998 p = self.Process(target=self._test_level, args=(writer,))
1999 p.daemon = True
2000 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002001 self.assertEqual(LEVEL2, reader.recv())
2002
2003 root_logger.setLevel(root_level)
2004 logger.setLevel(level=LOG_LEVEL)
2005
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002006
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002007# class _TestLoggingProcessName(BaseTestCase):
2008#
2009# def handle(self, record):
2010# assert record.processName == multiprocessing.current_process().name
2011# self.__handled = True
2012#
2013# def test_logging(self):
2014# handler = logging.Handler()
2015# handler.handle = self.handle
2016# self.__handled = False
2017# # Bypass getLogger() and side-effects
2018# logger = logging.getLoggerClass()(
2019# 'multiprocessing.test.TestLoggingProcessName')
2020# logger.addHandler(handler)
2021# logger.propagate = False
2022#
2023# logger.warn('foo')
2024# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002025
Benjamin Petersone711caf2008-06-11 16:44:04 +00002026#
Jesse Noller6214edd2009-01-19 16:23:53 +00002027# Test to verify handle verification, see issue 3321
2028#
2029
2030class TestInvalidHandle(unittest.TestCase):
2031
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002032 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002033 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00002034 conn = _multiprocessing.Connection(44977608)
2035 self.assertRaises(IOError, conn.poll)
2036 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002037
Jesse Noller6214edd2009-01-19 16:23:53 +00002038#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002039# Functions used to create test cases from the base ones in this module
2040#
2041
2042def get_attributes(Source, names):
2043 d = {}
2044 for name in names:
2045 obj = getattr(Source, name)
2046 if type(obj) == type(get_attributes):
2047 obj = staticmethod(obj)
2048 d[name] = obj
2049 return d
2050
2051def create_test_cases(Mixin, type):
2052 result = {}
2053 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002054 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002055
2056 for name in list(glob.keys()):
2057 if name.startswith('_Test'):
2058 base = glob[name]
2059 if type in base.ALLOWED_TYPES:
2060 newname = 'With' + Type + name[1:]
2061 class Temp(base, unittest.TestCase, Mixin):
2062 pass
2063 result[newname] = Temp
2064 Temp.__name__ = newname
2065 Temp.__module__ = Mixin.__module__
2066 return result
2067
2068#
2069# Create test cases
2070#
2071
2072class ProcessesMixin(object):
2073 TYPE = 'processes'
2074 Process = multiprocessing.Process
2075 locals().update(get_attributes(multiprocessing, (
2076 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2077 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2078 'RawArray', 'current_process', 'active_children', 'Pipe',
2079 'connection', 'JoinableQueue'
2080 )))
2081
2082testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2083globals().update(testcases_processes)
2084
2085
2086class ManagerMixin(object):
2087 TYPE = 'manager'
2088 Process = multiprocessing.Process
2089 manager = object.__new__(multiprocessing.managers.SyncManager)
2090 locals().update(get_attributes(manager, (
2091 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2092 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2093 'Namespace', 'JoinableQueue'
2094 )))
2095
2096testcases_manager = create_test_cases(ManagerMixin, type='manager')
2097globals().update(testcases_manager)
2098
2099
2100class ThreadsMixin(object):
2101 TYPE = 'threads'
2102 Process = multiprocessing.dummy.Process
2103 locals().update(get_attributes(multiprocessing.dummy, (
2104 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2105 'Condition', 'Event', 'Value', 'Array', 'current_process',
2106 'active_children', 'Pipe', 'connection', 'dict', 'list',
2107 'Namespace', 'JoinableQueue'
2108 )))
2109
2110testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2111globals().update(testcases_threads)
2112
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002113class OtherTest(unittest.TestCase):
2114 # TODO: add more tests for deliver/answer challenge.
2115 def test_deliver_challenge_auth_failure(self):
2116 class _FakeConnection(object):
2117 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002118 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002119 def send_bytes(self, data):
2120 pass
2121 self.assertRaises(multiprocessing.AuthenticationError,
2122 multiprocessing.connection.deliver_challenge,
2123 _FakeConnection(), b'abc')
2124
2125 def test_answer_challenge_auth_failure(self):
2126 class _FakeConnection(object):
2127 def __init__(self):
2128 self.count = 0
2129 def recv_bytes(self, size):
2130 self.count += 1
2131 if self.count == 1:
2132 return multiprocessing.connection.CHALLENGE
2133 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002134 return b'something bogus'
2135 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002136 def send_bytes(self, data):
2137 pass
2138 self.assertRaises(multiprocessing.AuthenticationError,
2139 multiprocessing.connection.answer_challenge,
2140 _FakeConnection(), b'abc')
2141
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002142#
2143# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2144#
2145
2146def initializer(ns):
2147 ns.test += 1
2148
2149class TestInitializers(unittest.TestCase):
2150 def setUp(self):
2151 self.mgr = multiprocessing.Manager()
2152 self.ns = self.mgr.Namespace()
2153 self.ns.test = 0
2154
2155 def tearDown(self):
2156 self.mgr.shutdown()
2157
2158 def test_manager_initializer(self):
2159 m = multiprocessing.managers.SyncManager()
2160 self.assertRaises(TypeError, m.start, 1)
2161 m.start(initializer, (self.ns,))
2162 self.assertEqual(self.ns.test, 1)
2163 m.shutdown()
2164
2165 def test_pool_initializer(self):
2166 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2167 p = multiprocessing.Pool(1, initializer, (self.ns,))
2168 p.close()
2169 p.join()
2170 self.assertEqual(self.ns.test, 1)
2171
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002172#
2173# Issue 5155, 5313, 5331: Test process in processes
2174# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2175#
2176
2177def _ThisSubProcess(q):
2178 try:
2179 item = q.get(block=False)
2180 except pyqueue.Empty:
2181 pass
2182
2183def _TestProcess(q):
2184 queue = multiprocessing.Queue()
2185 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002186 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002187 subProc.start()
2188 subProc.join()
2189
2190def _afunc(x):
2191 return x*x
2192
2193def pool_in_process():
2194 pool = multiprocessing.Pool(processes=4)
2195 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2196
2197class _file_like(object):
2198 def __init__(self, delegate):
2199 self._delegate = delegate
2200 self._pid = None
2201
2202 @property
2203 def cache(self):
2204 pid = os.getpid()
2205 # There are no race conditions since fork keeps only the running thread
2206 if pid != self._pid:
2207 self._pid = pid
2208 self._cache = []
2209 return self._cache
2210
2211 def write(self, data):
2212 self.cache.append(data)
2213
2214 def flush(self):
2215 self._delegate.write(''.join(self.cache))
2216 self._cache = []
2217
2218class TestStdinBadfiledescriptor(unittest.TestCase):
2219
2220 def test_queue_in_process(self):
2221 queue = multiprocessing.Queue()
2222 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2223 proc.start()
2224 proc.join()
2225
2226 def test_pool_in_process(self):
2227 p = multiprocessing.Process(target=pool_in_process)
2228 p.start()
2229 p.join()
2230
2231 def test_flushing(self):
2232 sio = io.StringIO()
2233 flike = _file_like(sio)
2234 flike.write('foo')
2235 proc = multiprocessing.Process(target=lambda: flike.flush())
2236 flike.flush()
2237 assert sio.getvalue() == 'foo'
2238
2239testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2240 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002241
Benjamin Petersone711caf2008-06-11 16:44:04 +00002242#
2243#
2244#
2245
2246def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002247 if sys.platform.startswith("linux"):
2248 try:
2249 lock = multiprocessing.RLock()
2250 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002251 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002252
Benjamin Petersone711caf2008-06-11 16:44:04 +00002253 if run is None:
2254 from test.support import run_unittest as run
2255
2256 util.get_temp_dir() # creates temp directory for use by all processes
2257
2258 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2259
Benjamin Peterson41181742008-07-02 20:22:54 +00002260 ProcessesMixin.pool = multiprocessing.Pool(4)
2261 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2262 ManagerMixin.manager.__init__()
2263 ManagerMixin.manager.start()
2264 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002265
2266 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002267 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2268 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002269 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2270 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002271 )
2272
2273 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2274 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2275 run(suite)
2276
Benjamin Peterson41181742008-07-02 20:22:54 +00002277 ThreadsMixin.pool.terminate()
2278 ProcessesMixin.pool.terminate()
2279 ManagerMixin.pool.terminate()
2280 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002281
Benjamin Peterson41181742008-07-02 20:22:54 +00002282 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002283
2284def main():
2285 test_main(unittest.TextTestRunner(verbosity=2).run)
2286
2287if __name__ == '__main__':
2288 main()