blob: 6f1c6c44977ace62eb7fcedcc069b919c5d21cf7 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000027# import threading after _multiprocessing to raise a more revelant error
28# message: "No module named _multiprocessing". _multiprocessing is not compiled
29# without thread support.
30import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.dummy
33import multiprocessing.connection
34import multiprocessing.managers
35import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
Antoine Pitroubcb39d42011-08-23 19:46:22 +020038from multiprocessing import util, reduction
Benjamin Petersone711caf2008-06-11 16:44:04 +000039
Brian Curtinafa88b52010-10-07 01:12:19 +000040try:
41 from multiprocessing.sharedctypes import Value, copy
42 HAS_SHAREDCTYPES = True
43except ImportError:
44 HAS_SHAREDCTYPES = False
45
Antoine Pitroubcb39d42011-08-23 19:46:22 +020046try:
47 import msvcrt
48except ImportError:
49 msvcrt = None
50
Benjamin Petersone711caf2008-06-11 16:44:04 +000051#
52#
53#
54
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000055def latin(s):
56 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000057
Benjamin Petersone711caf2008-06-11 16:44:04 +000058#
59# Constants
60#
61
62LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000063#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000064
65DELTA = 0.1
66CHECK_TIMINGS = False # making true makes tests take a lot longer
67 # and can sometimes cause some non-serious
68 # failures because some calls block a bit
69 # longer than expected
70if CHECK_TIMINGS:
71 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
72else:
73 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
74
75HAVE_GETVALUE = not getattr(_multiprocessing,
76 'HAVE_BROKEN_SEM_GETVALUE', False)
77
Jesse Noller6214edd2009-01-19 16:23:53 +000078WIN32 = (sys.platform == "win32")
79
Antoine Pitroubcb39d42011-08-23 19:46:22 +020080try:
81 MAXFD = os.sysconf("SC_OPEN_MAX")
82except:
83 MAXFD = 256
84
Benjamin Petersone711caf2008-06-11 16:44:04 +000085#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000086# Some tests require ctypes
87#
88
89try:
Florent Xiclunaaa171062010-08-14 15:56:42 +000090 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +000091except ImportError:
92 Structure = object
93 c_int = c_double = None
94
95#
Benjamin Petersone711caf2008-06-11 16:44:04 +000096# Creates a wrapper for a function which records the time it takes to finish
97#
98
99class TimingWrapper(object):
100
101 def __init__(self, func):
102 self.func = func
103 self.elapsed = None
104
105 def __call__(self, *args, **kwds):
106 t = time.time()
107 try:
108 return self.func(*args, **kwds)
109 finally:
110 self.elapsed = time.time() - t
111
112#
113# Base class for test cases
114#
115
116class BaseTestCase(object):
117
118 ALLOWED_TYPES = ('processes', 'manager', 'threads')
119
120 def assertTimingAlmostEqual(self, a, b):
121 if CHECK_TIMINGS:
122 self.assertAlmostEqual(a, b, 1)
123
124 def assertReturnsIfImplemented(self, value, func, *args):
125 try:
126 res = func(*args)
127 except NotImplementedError:
128 pass
129 else:
130 return self.assertEqual(value, res)
131
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000132 # For the sanity of Windows users, rather than crashing or freezing in
133 # multiple ways.
134 def __reduce__(self, *args):
135 raise NotImplementedError("shouldn't try to pickle a test case")
136
137 __reduce_ex__ = __reduce__
138
Benjamin Petersone711caf2008-06-11 16:44:04 +0000139#
140# Return the value of a semaphore
141#
142
143def get_value(self):
144 try:
145 return self.get_value()
146 except AttributeError:
147 try:
148 return self._Semaphore__value
149 except AttributeError:
150 try:
151 return self._value
152 except AttributeError:
153 raise NotImplementedError
154
155#
156# Testcases
157#
158
159class _TestProcess(BaseTestCase):
160
161 ALLOWED_TYPES = ('processes', 'threads')
162
163 def test_current(self):
164 if self.TYPE == 'threads':
165 return
166
167 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000168 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000169
170 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000171 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000172 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000173 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000174 self.assertEqual(current.ident, os.getpid())
175 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000176
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000177 @classmethod
178 def _test(cls, q, *args, **kwds):
179 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000180 q.put(args)
181 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000182 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000183 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000184 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000185 q.put(current.pid)
186
187 def test_process(self):
188 q = self.Queue(1)
189 e = self.Event()
190 args = (q, 1, 2)
191 kwargs = {'hello':23, 'bye':2.54}
192 name = 'SomeProcess'
193 p = self.Process(
194 target=self._test, args=args, kwargs=kwargs, name=name
195 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000196 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000197 current = self.current_process()
198
199 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000200 self.assertEqual(p.authkey, current.authkey)
201 self.assertEqual(p.is_alive(), False)
202 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000203 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000204 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000205 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206
207 p.start()
208
Ezio Melottib3aedd42010-11-20 19:04:17 +0000209 self.assertEqual(p.exitcode, None)
210 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000211 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000212
Ezio Melottib3aedd42010-11-20 19:04:17 +0000213 self.assertEqual(q.get(), args[1:])
214 self.assertEqual(q.get(), kwargs)
215 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000216 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000217 self.assertEqual(q.get(), current.authkey)
218 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000219
220 p.join()
221
Ezio Melottib3aedd42010-11-20 19:04:17 +0000222 self.assertEqual(p.exitcode, 0)
223 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000224 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000226 @classmethod
227 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000228 time.sleep(1000)
229
230 def test_terminate(self):
231 if self.TYPE == 'threads':
232 return
233
234 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000235 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000236 p.start()
237
238 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000239 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000240 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000241
242 p.terminate()
243
244 join = TimingWrapper(p.join)
245 self.assertEqual(join(), None)
246 self.assertTimingAlmostEqual(join.elapsed, 0.0)
247
248 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000249 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000250
251 p.join()
252
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000253 # XXX sometimes get p.exitcode == 0 on Windows ...
254 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000255
256 def test_cpu_count(self):
257 try:
258 cpus = multiprocessing.cpu_count()
259 except NotImplementedError:
260 cpus = 1
261 self.assertTrue(type(cpus) is int)
262 self.assertTrue(cpus >= 1)
263
264 def test_active_children(self):
265 self.assertEqual(type(self.active_children()), list)
266
267 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000268 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000269
270 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000271 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000272
273 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000274 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000275
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000276 @classmethod
277 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000278 from multiprocessing import forking
279 wconn.send(id)
280 if len(id) < 2:
281 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000282 p = cls.Process(
283 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000284 )
285 p.start()
286 p.join()
287
288 def test_recursion(self):
289 rconn, wconn = self.Pipe(duplex=False)
290 self._test_recursion(wconn, [])
291
292 time.sleep(DELTA)
293 result = []
294 while rconn.poll():
295 result.append(rconn.recv())
296
297 expected = [
298 [],
299 [0],
300 [0, 0],
301 [0, 1],
302 [1],
303 [1, 0],
304 [1, 1]
305 ]
306 self.assertEqual(result, expected)
307
308#
309#
310#
311
312class _UpperCaser(multiprocessing.Process):
313
314 def __init__(self):
315 multiprocessing.Process.__init__(self)
316 self.child_conn, self.parent_conn = multiprocessing.Pipe()
317
318 def run(self):
319 self.parent_conn.close()
320 for s in iter(self.child_conn.recv, None):
321 self.child_conn.send(s.upper())
322 self.child_conn.close()
323
324 def submit(self, s):
325 assert type(s) is str
326 self.parent_conn.send(s)
327 return self.parent_conn.recv()
328
329 def stop(self):
330 self.parent_conn.send(None)
331 self.parent_conn.close()
332 self.child_conn.close()
333
334class _TestSubclassingProcess(BaseTestCase):
335
336 ALLOWED_TYPES = ('processes',)
337
338 def test_subclassing(self):
339 uppercaser = _UpperCaser()
340 uppercaser.start()
341 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
342 self.assertEqual(uppercaser.submit('world'), 'WORLD')
343 uppercaser.stop()
344 uppercaser.join()
345
346#
347#
348#
349
350def queue_empty(q):
351 if hasattr(q, 'empty'):
352 return q.empty()
353 else:
354 return q.qsize() == 0
355
356def queue_full(q, maxsize):
357 if hasattr(q, 'full'):
358 return q.full()
359 else:
360 return q.qsize() == maxsize
361
362
363class _TestQueue(BaseTestCase):
364
365
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000366 @classmethod
367 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000368 child_can_start.wait()
369 for i in range(6):
370 queue.get()
371 parent_can_continue.set()
372
373 def test_put(self):
374 MAXSIZE = 6
375 queue = self.Queue(maxsize=MAXSIZE)
376 child_can_start = self.Event()
377 parent_can_continue = self.Event()
378
379 proc = self.Process(
380 target=self._test_put,
381 args=(queue, child_can_start, parent_can_continue)
382 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000383 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000384 proc.start()
385
386 self.assertEqual(queue_empty(queue), True)
387 self.assertEqual(queue_full(queue, MAXSIZE), False)
388
389 queue.put(1)
390 queue.put(2, True)
391 queue.put(3, True, None)
392 queue.put(4, False)
393 queue.put(5, False, None)
394 queue.put_nowait(6)
395
396 # the values may be in buffer but not yet in pipe so sleep a bit
397 time.sleep(DELTA)
398
399 self.assertEqual(queue_empty(queue), False)
400 self.assertEqual(queue_full(queue, MAXSIZE), True)
401
402 put = TimingWrapper(queue.put)
403 put_nowait = TimingWrapper(queue.put_nowait)
404
405 self.assertRaises(pyqueue.Full, put, 7, False)
406 self.assertTimingAlmostEqual(put.elapsed, 0)
407
408 self.assertRaises(pyqueue.Full, put, 7, False, None)
409 self.assertTimingAlmostEqual(put.elapsed, 0)
410
411 self.assertRaises(pyqueue.Full, put_nowait, 7)
412 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
413
414 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
415 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
416
417 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
418 self.assertTimingAlmostEqual(put.elapsed, 0)
419
420 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
421 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
422
423 child_can_start.set()
424 parent_can_continue.wait()
425
426 self.assertEqual(queue_empty(queue), True)
427 self.assertEqual(queue_full(queue, MAXSIZE), False)
428
429 proc.join()
430
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000431 @classmethod
432 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000433 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000434 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000435 queue.put(2)
436 queue.put(3)
437 queue.put(4)
438 queue.put(5)
439 parent_can_continue.set()
440
441 def test_get(self):
442 queue = self.Queue()
443 child_can_start = self.Event()
444 parent_can_continue = self.Event()
445
446 proc = self.Process(
447 target=self._test_get,
448 args=(queue, child_can_start, parent_can_continue)
449 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000450 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000451 proc.start()
452
453 self.assertEqual(queue_empty(queue), True)
454
455 child_can_start.set()
456 parent_can_continue.wait()
457
458 time.sleep(DELTA)
459 self.assertEqual(queue_empty(queue), False)
460
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000461 # Hangs unexpectedly, remove for now
462 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000463 self.assertEqual(queue.get(True, None), 2)
464 self.assertEqual(queue.get(True), 3)
465 self.assertEqual(queue.get(timeout=1), 4)
466 self.assertEqual(queue.get_nowait(), 5)
467
468 self.assertEqual(queue_empty(queue), True)
469
470 get = TimingWrapper(queue.get)
471 get_nowait = TimingWrapper(queue.get_nowait)
472
473 self.assertRaises(pyqueue.Empty, get, False)
474 self.assertTimingAlmostEqual(get.elapsed, 0)
475
476 self.assertRaises(pyqueue.Empty, get, False, None)
477 self.assertTimingAlmostEqual(get.elapsed, 0)
478
479 self.assertRaises(pyqueue.Empty, get_nowait)
480 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
481
482 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
483 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
484
485 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
486 self.assertTimingAlmostEqual(get.elapsed, 0)
487
488 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
489 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
490
491 proc.join()
492
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000493 @classmethod
494 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000495 for i in range(10, 20):
496 queue.put(i)
497 # note that at this point the items may only be buffered, so the
498 # process cannot shutdown until the feeder thread has finished
499 # pushing items onto the pipe.
500
501 def test_fork(self):
502 # Old versions of Queue would fail to create a new feeder
503 # thread for a forked process if the original process had its
504 # own feeder thread. This test checks that this no longer
505 # happens.
506
507 queue = self.Queue()
508
509 # put items on queue so that main process starts a feeder thread
510 for i in range(10):
511 queue.put(i)
512
513 # wait to make sure thread starts before we fork a new process
514 time.sleep(DELTA)
515
516 # fork process
517 p = self.Process(target=self._test_fork, args=(queue,))
518 p.start()
519
520 # check that all expected items are in the queue
521 for i in range(20):
522 self.assertEqual(queue.get(), i)
523 self.assertRaises(pyqueue.Empty, queue.get, False)
524
525 p.join()
526
527 def test_qsize(self):
528 q = self.Queue()
529 try:
530 self.assertEqual(q.qsize(), 0)
531 except NotImplementedError:
532 return
533 q.put(1)
534 self.assertEqual(q.qsize(), 1)
535 q.put(5)
536 self.assertEqual(q.qsize(), 2)
537 q.get()
538 self.assertEqual(q.qsize(), 1)
539 q.get()
540 self.assertEqual(q.qsize(), 0)
541
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000542 @classmethod
543 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000544 for obj in iter(q.get, None):
545 time.sleep(DELTA)
546 q.task_done()
547
548 def test_task_done(self):
549 queue = self.JoinableQueue()
550
551 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000552 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000553
554 workers = [self.Process(target=self._test_task_done, args=(queue,))
555 for i in range(4)]
556
557 for p in workers:
558 p.start()
559
560 for i in range(10):
561 queue.put(i)
562
563 queue.join()
564
565 for p in workers:
566 queue.put(None)
567
568 for p in workers:
569 p.join()
570
571#
572#
573#
574
575class _TestLock(BaseTestCase):
576
577 def test_lock(self):
578 lock = self.Lock()
579 self.assertEqual(lock.acquire(), True)
580 self.assertEqual(lock.acquire(False), False)
581 self.assertEqual(lock.release(), None)
582 self.assertRaises((ValueError, threading.ThreadError), lock.release)
583
584 def test_rlock(self):
585 lock = self.RLock()
586 self.assertEqual(lock.acquire(), True)
587 self.assertEqual(lock.acquire(), True)
588 self.assertEqual(lock.acquire(), True)
589 self.assertEqual(lock.release(), None)
590 self.assertEqual(lock.release(), None)
591 self.assertEqual(lock.release(), None)
592 self.assertRaises((AssertionError, RuntimeError), lock.release)
593
Jesse Nollerf8d00852009-03-31 03:25:07 +0000594 def test_lock_context(self):
595 with self.Lock():
596 pass
597
Benjamin Petersone711caf2008-06-11 16:44:04 +0000598
599class _TestSemaphore(BaseTestCase):
600
601 def _test_semaphore(self, sem):
602 self.assertReturnsIfImplemented(2, get_value, sem)
603 self.assertEqual(sem.acquire(), True)
604 self.assertReturnsIfImplemented(1, get_value, sem)
605 self.assertEqual(sem.acquire(), True)
606 self.assertReturnsIfImplemented(0, get_value, sem)
607 self.assertEqual(sem.acquire(False), False)
608 self.assertReturnsIfImplemented(0, get_value, sem)
609 self.assertEqual(sem.release(), None)
610 self.assertReturnsIfImplemented(1, get_value, sem)
611 self.assertEqual(sem.release(), None)
612 self.assertReturnsIfImplemented(2, get_value, sem)
613
614 def test_semaphore(self):
615 sem = self.Semaphore(2)
616 self._test_semaphore(sem)
617 self.assertEqual(sem.release(), None)
618 self.assertReturnsIfImplemented(3, get_value, sem)
619 self.assertEqual(sem.release(), None)
620 self.assertReturnsIfImplemented(4, get_value, sem)
621
622 def test_bounded_semaphore(self):
623 sem = self.BoundedSemaphore(2)
624 self._test_semaphore(sem)
625 # Currently fails on OS/X
626 #if HAVE_GETVALUE:
627 # self.assertRaises(ValueError, sem.release)
628 # self.assertReturnsIfImplemented(2, get_value, sem)
629
630 def test_timeout(self):
631 if self.TYPE != 'processes':
632 return
633
634 sem = self.Semaphore(0)
635 acquire = TimingWrapper(sem.acquire)
636
637 self.assertEqual(acquire(False), False)
638 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
639
640 self.assertEqual(acquire(False, None), False)
641 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
642
643 self.assertEqual(acquire(False, TIMEOUT1), False)
644 self.assertTimingAlmostEqual(acquire.elapsed, 0)
645
646 self.assertEqual(acquire(True, TIMEOUT2), False)
647 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
648
649 self.assertEqual(acquire(timeout=TIMEOUT3), False)
650 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
651
652
653class _TestCondition(BaseTestCase):
654
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000655 @classmethod
656 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000657 cond.acquire()
658 sleeping.release()
659 cond.wait(timeout)
660 woken.release()
661 cond.release()
662
663 def check_invariant(self, cond):
664 # this is only supposed to succeed when there are no sleepers
665 if self.TYPE == 'processes':
666 try:
667 sleepers = (cond._sleeping_count.get_value() -
668 cond._woken_count.get_value())
669 self.assertEqual(sleepers, 0)
670 self.assertEqual(cond._wait_semaphore.get_value(), 0)
671 except NotImplementedError:
672 pass
673
674 def test_notify(self):
675 cond = self.Condition()
676 sleeping = self.Semaphore(0)
677 woken = self.Semaphore(0)
678
679 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000680 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000681 p.start()
682
683 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000684 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000685 p.start()
686
687 # wait for both children to start sleeping
688 sleeping.acquire()
689 sleeping.acquire()
690
691 # check no process/thread has woken up
692 time.sleep(DELTA)
693 self.assertReturnsIfImplemented(0, get_value, woken)
694
695 # wake up one process/thread
696 cond.acquire()
697 cond.notify()
698 cond.release()
699
700 # check one process/thread has woken up
701 time.sleep(DELTA)
702 self.assertReturnsIfImplemented(1, get_value, woken)
703
704 # wake up another
705 cond.acquire()
706 cond.notify()
707 cond.release()
708
709 # check other has woken up
710 time.sleep(DELTA)
711 self.assertReturnsIfImplemented(2, get_value, woken)
712
713 # check state is not mucked up
714 self.check_invariant(cond)
715 p.join()
716
717 def test_notify_all(self):
718 cond = self.Condition()
719 sleeping = self.Semaphore(0)
720 woken = self.Semaphore(0)
721
722 # start some threads/processes which will timeout
723 for i in range(3):
724 p = self.Process(target=self.f,
725 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000726 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000727 p.start()
728
729 t = threading.Thread(target=self.f,
730 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000731 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000732 t.start()
733
734 # wait for them all to sleep
735 for i in range(6):
736 sleeping.acquire()
737
738 # check they have all timed out
739 for i in range(6):
740 woken.acquire()
741 self.assertReturnsIfImplemented(0, get_value, woken)
742
743 # check state is not mucked up
744 self.check_invariant(cond)
745
746 # start some more threads/processes
747 for i in range(3):
748 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000749 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000750 p.start()
751
752 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000753 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000754 t.start()
755
756 # wait for them to all sleep
757 for i in range(6):
758 sleeping.acquire()
759
760 # check no process/thread has woken up
761 time.sleep(DELTA)
762 self.assertReturnsIfImplemented(0, get_value, woken)
763
764 # wake them all up
765 cond.acquire()
766 cond.notify_all()
767 cond.release()
768
769 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200770 for i in range(10):
771 try:
772 if get_value(woken) == 6:
773 break
774 except NotImplementedError:
775 break
776 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000777 self.assertReturnsIfImplemented(6, get_value, woken)
778
779 # check state is not mucked up
780 self.check_invariant(cond)
781
782 def test_timeout(self):
783 cond = self.Condition()
784 wait = TimingWrapper(cond.wait)
785 cond.acquire()
786 res = wait(TIMEOUT1)
787 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000788 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000789 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
790
791
792class _TestEvent(BaseTestCase):
793
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000794 @classmethod
795 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000796 time.sleep(TIMEOUT2)
797 event.set()
798
799 def test_event(self):
800 event = self.Event()
801 wait = TimingWrapper(event.wait)
802
Ezio Melotti13925002011-03-16 11:05:33 +0200803 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000804 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000805 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000806
Benjamin Peterson965ce872009-04-05 21:24:58 +0000807 # Removed, threading.Event.wait() will return the value of the __flag
808 # instead of None. API Shear with the semaphore backed mp.Event
809 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000810 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000811 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000812 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
813
814 event.set()
815
816 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000817 self.assertEqual(event.is_set(), True)
818 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000819 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000820 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000821 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
822 # self.assertEqual(event.is_set(), True)
823
824 event.clear()
825
826 #self.assertEqual(event.is_set(), False)
827
828 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000829 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000830
831#
832#
833#
834
835class _TestValue(BaseTestCase):
836
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000837 ALLOWED_TYPES = ('processes',)
838
Benjamin Petersone711caf2008-06-11 16:44:04 +0000839 codes_values = [
840 ('i', 4343, 24234),
841 ('d', 3.625, -4.25),
842 ('h', -232, 234),
843 ('c', latin('x'), latin('y'))
844 ]
845
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000846 def setUp(self):
847 if not HAS_SHAREDCTYPES:
848 self.skipTest("requires multiprocessing.sharedctypes")
849
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000850 @classmethod
851 def _test(cls, values):
852 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000853 sv.value = cv[2]
854
855
856 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000857 if raw:
858 values = [self.RawValue(code, value)
859 for code, value, _ in self.codes_values]
860 else:
861 values = [self.Value(code, value)
862 for code, value, _ in self.codes_values]
863
864 for sv, cv in zip(values, self.codes_values):
865 self.assertEqual(sv.value, cv[1])
866
867 proc = self.Process(target=self._test, args=(values,))
868 proc.start()
869 proc.join()
870
871 for sv, cv in zip(values, self.codes_values):
872 self.assertEqual(sv.value, cv[2])
873
874 def test_rawvalue(self):
875 self.test_value(raw=True)
876
877 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000878 val1 = self.Value('i', 5)
879 lock1 = val1.get_lock()
880 obj1 = val1.get_obj()
881
882 val2 = self.Value('i', 5, lock=None)
883 lock2 = val2.get_lock()
884 obj2 = val2.get_obj()
885
886 lock = self.Lock()
887 val3 = self.Value('i', 5, lock=lock)
888 lock3 = val3.get_lock()
889 obj3 = val3.get_obj()
890 self.assertEqual(lock, lock3)
891
Jesse Nollerb0516a62009-01-18 03:11:38 +0000892 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000893 self.assertFalse(hasattr(arr4, 'get_lock'))
894 self.assertFalse(hasattr(arr4, 'get_obj'))
895
Jesse Nollerb0516a62009-01-18 03:11:38 +0000896 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
897
898 arr5 = self.RawValue('i', 5)
899 self.assertFalse(hasattr(arr5, 'get_lock'))
900 self.assertFalse(hasattr(arr5, 'get_obj'))
901
Benjamin Petersone711caf2008-06-11 16:44:04 +0000902
903class _TestArray(BaseTestCase):
904
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000905 ALLOWED_TYPES = ('processes',)
906
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000907 @classmethod
908 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000909 for i in range(1, len(seq)):
910 seq[i] += seq[i-1]
911
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000912 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000913 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000914 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
915 if raw:
916 arr = self.RawArray('i', seq)
917 else:
918 arr = self.Array('i', seq)
919
920 self.assertEqual(len(arr), len(seq))
921 self.assertEqual(arr[3], seq[3])
922 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
923
924 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
925
926 self.assertEqual(list(arr[:]), seq)
927
928 self.f(seq)
929
930 p = self.Process(target=self.f, args=(arr,))
931 p.start()
932 p.join()
933
934 self.assertEqual(list(arr[:]), seq)
935
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000936 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +0000937 def test_array_from_size(self):
938 size = 10
939 # Test for zeroing (see issue #11675).
940 # The repetition below strengthens the test by increasing the chances
941 # of previously allocated non-zero memory being used for the new array
942 # on the 2nd and 3rd loops.
943 for _ in range(3):
944 arr = self.Array('i', size)
945 self.assertEqual(len(arr), size)
946 self.assertEqual(list(arr), [0] * size)
947 arr[:] = range(10)
948 self.assertEqual(list(arr), list(range(10)))
949 del arr
950
951 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000952 def test_rawarray(self):
953 self.test_array(raw=True)
954
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000955 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000956 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000957 arr1 = self.Array('i', list(range(10)))
958 lock1 = arr1.get_lock()
959 obj1 = arr1.get_obj()
960
961 arr2 = self.Array('i', list(range(10)), lock=None)
962 lock2 = arr2.get_lock()
963 obj2 = arr2.get_obj()
964
965 lock = self.Lock()
966 arr3 = self.Array('i', list(range(10)), lock=lock)
967 lock3 = arr3.get_lock()
968 obj3 = arr3.get_obj()
969 self.assertEqual(lock, lock3)
970
Jesse Nollerb0516a62009-01-18 03:11:38 +0000971 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000972 self.assertFalse(hasattr(arr4, 'get_lock'))
973 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000974 self.assertRaises(AttributeError,
975 self.Array, 'i', range(10), lock='notalock')
976
977 arr5 = self.RawArray('i', range(10))
978 self.assertFalse(hasattr(arr5, 'get_lock'))
979 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000980
981#
982#
983#
984
985class _TestContainers(BaseTestCase):
986
987 ALLOWED_TYPES = ('manager',)
988
989 def test_list(self):
990 a = self.list(list(range(10)))
991 self.assertEqual(a[:], list(range(10)))
992
993 b = self.list()
994 self.assertEqual(b[:], [])
995
996 b.extend(list(range(5)))
997 self.assertEqual(b[:], list(range(5)))
998
999 self.assertEqual(b[2], 2)
1000 self.assertEqual(b[2:10], [2,3,4])
1001
1002 b *= 2
1003 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1004
1005 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1006
1007 self.assertEqual(a[:], list(range(10)))
1008
1009 d = [a, b]
1010 e = self.list(d)
1011 self.assertEqual(
1012 e[:],
1013 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1014 )
1015
1016 f = self.list([a])
1017 a.append('hello')
1018 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1019
1020 def test_dict(self):
1021 d = self.dict()
1022 indices = list(range(65, 70))
1023 for i in indices:
1024 d[i] = chr(i)
1025 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1026 self.assertEqual(sorted(d.keys()), indices)
1027 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1028 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1029
1030 def test_namespace(self):
1031 n = self.Namespace()
1032 n.name = 'Bob'
1033 n.job = 'Builder'
1034 n._hidden = 'hidden'
1035 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1036 del n.job
1037 self.assertEqual(str(n), "Namespace(name='Bob')")
1038 self.assertTrue(hasattr(n, 'name'))
1039 self.assertTrue(not hasattr(n, 'job'))
1040
1041#
1042#
1043#
1044
1045def sqr(x, wait=0.0):
1046 time.sleep(wait)
1047 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001048
Benjamin Petersone711caf2008-06-11 16:44:04 +00001049class _TestPool(BaseTestCase):
1050
1051 def test_apply(self):
1052 papply = self.pool.apply
1053 self.assertEqual(papply(sqr, (5,)), sqr(5))
1054 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1055
1056 def test_map(self):
1057 pmap = self.pool.map
1058 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1059 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1060 list(map(sqr, list(range(100)))))
1061
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001062 def test_map_chunksize(self):
1063 try:
1064 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1065 except multiprocessing.TimeoutError:
1066 self.fail("pool.map_async with chunksize stalled on null list")
1067
Benjamin Petersone711caf2008-06-11 16:44:04 +00001068 def test_async(self):
1069 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1070 get = TimingWrapper(res.get)
1071 self.assertEqual(get(), 49)
1072 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1073
1074 def test_async_timeout(self):
1075 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1076 get = TimingWrapper(res.get)
1077 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1078 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1079
1080 def test_imap(self):
1081 it = self.pool.imap(sqr, list(range(10)))
1082 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1083
1084 it = self.pool.imap(sqr, list(range(10)))
1085 for i in range(10):
1086 self.assertEqual(next(it), i*i)
1087 self.assertRaises(StopIteration, it.__next__)
1088
1089 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1090 for i in range(1000):
1091 self.assertEqual(next(it), i*i)
1092 self.assertRaises(StopIteration, it.__next__)
1093
1094 def test_imap_unordered(self):
1095 it = self.pool.imap_unordered(sqr, list(range(1000)))
1096 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1097
1098 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1099 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1100
1101 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001102 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1103 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1104
Benjamin Petersone711caf2008-06-11 16:44:04 +00001105 p = multiprocessing.Pool(3)
1106 self.assertEqual(3, len(p._pool))
1107 p.close()
1108 p.join()
1109
1110 def test_terminate(self):
1111 if self.TYPE == 'manager':
1112 # On Unix a forked process increfs each shared object to
1113 # which its parent process held a reference. If the
1114 # forked process gets terminated then there is likely to
1115 # be a reference leak. So to prevent
1116 # _TestZZZNumberOfObjects from failing we skip this test
1117 # when using a manager.
1118 return
1119
1120 result = self.pool.map_async(
1121 time.sleep, [0.1 for i in range(10000)], chunksize=1
1122 )
1123 self.pool.terminate()
1124 join = TimingWrapper(self.pool.join)
1125 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001126 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001127
Ask Solem2afcbf22010-11-09 20:55:52 +00001128def raising():
1129 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001130
Ask Solem2afcbf22010-11-09 20:55:52 +00001131def unpickleable_result():
1132 return lambda: 42
1133
1134class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001135 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001136
1137 def test_async_error_callback(self):
1138 p = multiprocessing.Pool(2)
1139
1140 scratchpad = [None]
1141 def errback(exc):
1142 scratchpad[0] = exc
1143
1144 res = p.apply_async(raising, error_callback=errback)
1145 self.assertRaises(KeyError, res.get)
1146 self.assertTrue(scratchpad[0])
1147 self.assertIsInstance(scratchpad[0], KeyError)
1148
1149 p.close()
1150 p.join()
1151
1152 def test_unpickleable_result(self):
1153 from multiprocessing.pool import MaybeEncodingError
1154 p = multiprocessing.Pool(2)
1155
1156 # Make sure we don't lose pool processes because of encoding errors.
1157 for iteration in range(20):
1158
1159 scratchpad = [None]
1160 def errback(exc):
1161 scratchpad[0] = exc
1162
1163 res = p.apply_async(unpickleable_result, error_callback=errback)
1164 self.assertRaises(MaybeEncodingError, res.get)
1165 wrapped = scratchpad[0]
1166 self.assertTrue(wrapped)
1167 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1168 self.assertIsNotNone(wrapped.exc)
1169 self.assertIsNotNone(wrapped.value)
1170
1171 p.close()
1172 p.join()
1173
1174class _TestPoolWorkerLifetime(BaseTestCase):
1175 ALLOWED_TYPES = ('processes', )
1176
Jesse Noller1f0b6582010-01-27 03:36:01 +00001177 def test_pool_worker_lifetime(self):
1178 p = multiprocessing.Pool(3, maxtasksperchild=10)
1179 self.assertEqual(3, len(p._pool))
1180 origworkerpids = [w.pid for w in p._pool]
1181 # Run many tasks so each worker gets replaced (hopefully)
1182 results = []
1183 for i in range(100):
1184 results.append(p.apply_async(sqr, (i, )))
1185 # Fetch the results and verify we got the right answers,
1186 # also ensuring all the tasks have completed.
1187 for (j, res) in enumerate(results):
1188 self.assertEqual(res.get(), sqr(j))
1189 # Refill the pool
1190 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001191 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001192 # (countdown * DELTA = 5 seconds max startup process time)
1193 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001194 while countdown and not all(w.is_alive() for w in p._pool):
1195 countdown -= 1
1196 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001197 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001198 # All pids should be assigned. See issue #7805.
1199 self.assertNotIn(None, origworkerpids)
1200 self.assertNotIn(None, finalworkerpids)
1201 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001202 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1203 p.close()
1204 p.join()
1205
Benjamin Petersone711caf2008-06-11 16:44:04 +00001206#
1207# Test that manager has expected number of shared objects left
1208#
1209
1210class _TestZZZNumberOfObjects(BaseTestCase):
1211 # Because test cases are sorted alphabetically, this one will get
1212 # run after all the other tests for the manager. It tests that
1213 # there have been no "reference leaks" for the manager's shared
1214 # objects. Note the comment in _TestPool.test_terminate().
1215 ALLOWED_TYPES = ('manager',)
1216
1217 def test_number_of_objects(self):
1218 EXPECTED_NUMBER = 1 # the pool object is still alive
1219 multiprocessing.active_children() # discard dead process objs
1220 gc.collect() # do garbage collection
1221 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001222 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001223 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001224 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001225 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001226
1227 self.assertEqual(refs, EXPECTED_NUMBER)
1228
1229#
1230# Test of creating a customized manager class
1231#
1232
1233from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1234
1235class FooBar(object):
1236 def f(self):
1237 return 'f()'
1238 def g(self):
1239 raise ValueError
1240 def _h(self):
1241 return '_h()'
1242
1243def baz():
1244 for i in range(10):
1245 yield i*i
1246
1247class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001248 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001249 def __iter__(self):
1250 return self
1251 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001252 return self._callmethod('__next__')
1253
1254class MyManager(BaseManager):
1255 pass
1256
1257MyManager.register('Foo', callable=FooBar)
1258MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1259MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1260
1261
1262class _TestMyManager(BaseTestCase):
1263
1264 ALLOWED_TYPES = ('manager',)
1265
1266 def test_mymanager(self):
1267 manager = MyManager()
1268 manager.start()
1269
1270 foo = manager.Foo()
1271 bar = manager.Bar()
1272 baz = manager.baz()
1273
1274 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1275 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1276
1277 self.assertEqual(foo_methods, ['f', 'g'])
1278 self.assertEqual(bar_methods, ['f', '_h'])
1279
1280 self.assertEqual(foo.f(), 'f()')
1281 self.assertRaises(ValueError, foo.g)
1282 self.assertEqual(foo._callmethod('f'), 'f()')
1283 self.assertRaises(RemoteError, foo._callmethod, '_h')
1284
1285 self.assertEqual(bar.f(), 'f()')
1286 self.assertEqual(bar._h(), '_h()')
1287 self.assertEqual(bar._callmethod('f'), 'f()')
1288 self.assertEqual(bar._callmethod('_h'), '_h()')
1289
1290 self.assertEqual(list(baz), [i*i for i in range(10)])
1291
1292 manager.shutdown()
1293
1294#
1295# Test of connecting to a remote server and using xmlrpclib for serialization
1296#
1297
1298_queue = pyqueue.Queue()
1299def get_queue():
1300 return _queue
1301
1302class QueueManager(BaseManager):
1303 '''manager class used by server process'''
1304QueueManager.register('get_queue', callable=get_queue)
1305
1306class QueueManager2(BaseManager):
1307 '''manager class which specifies the same interface as QueueManager'''
1308QueueManager2.register('get_queue')
1309
1310
1311SERIALIZER = 'xmlrpclib'
1312
1313class _TestRemoteManager(BaseTestCase):
1314
1315 ALLOWED_TYPES = ('manager',)
1316
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001317 @classmethod
1318 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001319 manager = QueueManager2(
1320 address=address, authkey=authkey, serializer=SERIALIZER
1321 )
1322 manager.connect()
1323 queue = manager.get_queue()
1324 queue.put(('hello world', None, True, 2.25))
1325
1326 def test_remote(self):
1327 authkey = os.urandom(32)
1328
1329 manager = QueueManager(
1330 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1331 )
1332 manager.start()
1333
1334 p = self.Process(target=self._putter, args=(manager.address, authkey))
1335 p.start()
1336
1337 manager2 = QueueManager2(
1338 address=manager.address, authkey=authkey, serializer=SERIALIZER
1339 )
1340 manager2.connect()
1341 queue = manager2.get_queue()
1342
1343 # Note that xmlrpclib will deserialize object as a list not a tuple
1344 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1345
1346 # Because we are using xmlrpclib for serialization instead of
1347 # pickle this will cause a serialization error.
1348 self.assertRaises(Exception, queue.put, time.sleep)
1349
1350 # Make queue finalizer run before the server is stopped
1351 del queue
1352 manager.shutdown()
1353
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001354class _TestManagerRestart(BaseTestCase):
1355
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001356 @classmethod
1357 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001358 manager = QueueManager(
1359 address=address, authkey=authkey, serializer=SERIALIZER)
1360 manager.connect()
1361 queue = manager.get_queue()
1362 queue.put('hello world')
1363
1364 def test_rapid_restart(self):
1365 authkey = os.urandom(32)
1366 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001367 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001368 srvr = manager.get_server()
1369 addr = srvr.address
1370 # Close the connection.Listener socket which gets opened as a part
1371 # of manager.get_server(). It's not needed for the test.
1372 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001373 manager.start()
1374
1375 p = self.Process(target=self._putter, args=(manager.address, authkey))
1376 p.start()
1377 queue = manager.get_queue()
1378 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001379 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001380 manager.shutdown()
1381 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001382 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001383 try:
1384 manager.start()
1385 except IOError as e:
1386 if e.errno != errno.EADDRINUSE:
1387 raise
1388 # Retry after some time, in case the old socket was lingering
1389 # (sporadic failure on buildbots)
1390 time.sleep(1.0)
1391 manager = QueueManager(
1392 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001393 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001394
Benjamin Petersone711caf2008-06-11 16:44:04 +00001395#
1396#
1397#
1398
1399SENTINEL = latin('')
1400
1401class _TestConnection(BaseTestCase):
1402
1403 ALLOWED_TYPES = ('processes', 'threads')
1404
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001405 @classmethod
1406 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001407 for msg in iter(conn.recv_bytes, SENTINEL):
1408 conn.send_bytes(msg)
1409 conn.close()
1410
1411 def test_connection(self):
1412 conn, child_conn = self.Pipe()
1413
1414 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001415 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001416 p.start()
1417
1418 seq = [1, 2.25, None]
1419 msg = latin('hello world')
1420 longmsg = msg * 10
1421 arr = array.array('i', list(range(4)))
1422
1423 if self.TYPE == 'processes':
1424 self.assertEqual(type(conn.fileno()), int)
1425
1426 self.assertEqual(conn.send(seq), None)
1427 self.assertEqual(conn.recv(), seq)
1428
1429 self.assertEqual(conn.send_bytes(msg), None)
1430 self.assertEqual(conn.recv_bytes(), msg)
1431
1432 if self.TYPE == 'processes':
1433 buffer = array.array('i', [0]*10)
1434 expected = list(arr) + [0] * (10 - len(arr))
1435 self.assertEqual(conn.send_bytes(arr), None)
1436 self.assertEqual(conn.recv_bytes_into(buffer),
1437 len(arr) * buffer.itemsize)
1438 self.assertEqual(list(buffer), expected)
1439
1440 buffer = array.array('i', [0]*10)
1441 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1442 self.assertEqual(conn.send_bytes(arr), None)
1443 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1444 len(arr) * buffer.itemsize)
1445 self.assertEqual(list(buffer), expected)
1446
1447 buffer = bytearray(latin(' ' * 40))
1448 self.assertEqual(conn.send_bytes(longmsg), None)
1449 try:
1450 res = conn.recv_bytes_into(buffer)
1451 except multiprocessing.BufferTooShort as e:
1452 self.assertEqual(e.args, (longmsg,))
1453 else:
1454 self.fail('expected BufferTooShort, got %s' % res)
1455
1456 poll = TimingWrapper(conn.poll)
1457
1458 self.assertEqual(poll(), False)
1459 self.assertTimingAlmostEqual(poll.elapsed, 0)
1460
1461 self.assertEqual(poll(TIMEOUT1), False)
1462 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1463
1464 conn.send(None)
1465
1466 self.assertEqual(poll(TIMEOUT1), True)
1467 self.assertTimingAlmostEqual(poll.elapsed, 0)
1468
1469 self.assertEqual(conn.recv(), None)
1470
1471 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1472 conn.send_bytes(really_big_msg)
1473 self.assertEqual(conn.recv_bytes(), really_big_msg)
1474
1475 conn.send_bytes(SENTINEL) # tell child to quit
1476 child_conn.close()
1477
1478 if self.TYPE == 'processes':
1479 self.assertEqual(conn.readable, True)
1480 self.assertEqual(conn.writable, True)
1481 self.assertRaises(EOFError, conn.recv)
1482 self.assertRaises(EOFError, conn.recv_bytes)
1483
1484 p.join()
1485
1486 def test_duplex_false(self):
1487 reader, writer = self.Pipe(duplex=False)
1488 self.assertEqual(writer.send(1), None)
1489 self.assertEqual(reader.recv(), 1)
1490 if self.TYPE == 'processes':
1491 self.assertEqual(reader.readable, True)
1492 self.assertEqual(reader.writable, False)
1493 self.assertEqual(writer.readable, False)
1494 self.assertEqual(writer.writable, True)
1495 self.assertRaises(IOError, reader.send, 2)
1496 self.assertRaises(IOError, writer.recv)
1497 self.assertRaises(IOError, writer.poll)
1498
1499 def test_spawn_close(self):
1500 # We test that a pipe connection can be closed by parent
1501 # process immediately after child is spawned. On Windows this
1502 # would have sometimes failed on old versions because
1503 # child_conn would be closed before the child got a chance to
1504 # duplicate it.
1505 conn, child_conn = self.Pipe()
1506
1507 p = self.Process(target=self._echo, args=(child_conn,))
1508 p.start()
1509 child_conn.close() # this might complete before child initializes
1510
1511 msg = latin('hello')
1512 conn.send_bytes(msg)
1513 self.assertEqual(conn.recv_bytes(), msg)
1514
1515 conn.send_bytes(SENTINEL)
1516 conn.close()
1517 p.join()
1518
1519 def test_sendbytes(self):
1520 if self.TYPE != 'processes':
1521 return
1522
1523 msg = latin('abcdefghijklmnopqrstuvwxyz')
1524 a, b = self.Pipe()
1525
1526 a.send_bytes(msg)
1527 self.assertEqual(b.recv_bytes(), msg)
1528
1529 a.send_bytes(msg, 5)
1530 self.assertEqual(b.recv_bytes(), msg[5:])
1531
1532 a.send_bytes(msg, 7, 8)
1533 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1534
1535 a.send_bytes(msg, 26)
1536 self.assertEqual(b.recv_bytes(), latin(''))
1537
1538 a.send_bytes(msg, 26, 0)
1539 self.assertEqual(b.recv_bytes(), latin(''))
1540
1541 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1542
1543 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1544
1545 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1546
1547 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1548
1549 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1550
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001551 @classmethod
1552 def _is_fd_assigned(cls, fd):
1553 try:
1554 os.fstat(fd)
1555 except OSError as e:
1556 if e.errno == errno.EBADF:
1557 return False
1558 raise
1559 else:
1560 return True
1561
1562 @classmethod
1563 def _writefd(cls, conn, data, create_dummy_fds=False):
1564 if create_dummy_fds:
1565 for i in range(0, 256):
1566 if not cls._is_fd_assigned(i):
1567 os.dup2(conn.fileno(), i)
1568 fd = reduction.recv_handle(conn)
1569 if msvcrt:
1570 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1571 os.write(fd, data)
1572 os.close(fd)
1573
1574 def test_fd_transfer(self):
1575 if self.TYPE != 'processes':
1576 self.skipTest("only makes sense with processes")
1577 conn, child_conn = self.Pipe(duplex=True)
1578
1579 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
1580 p.start()
1581 with open(test.support.TESTFN, "wb") as f:
1582 fd = f.fileno()
1583 if msvcrt:
1584 fd = msvcrt.get_osfhandle(fd)
1585 reduction.send_handle(conn, fd, p.pid)
1586 p.join()
1587 with open(test.support.TESTFN, "rb") as f:
1588 self.assertEqual(f.read(), b"foo")
1589
1590 @unittest.skipIf(sys.platform == "win32",
1591 "test semantics don't make sense on Windows")
1592 @unittest.skipIf(MAXFD <= 256,
1593 "largest assignable fd number is too small")
1594 @unittest.skipUnless(hasattr(os, "dup2"),
1595 "test needs os.dup2()")
1596 def test_large_fd_transfer(self):
1597 # With fd > 256 (issue #11657)
1598 if self.TYPE != 'processes':
1599 self.skipTest("only makes sense with processes")
1600 conn, child_conn = self.Pipe(duplex=True)
1601
1602 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
1603 p.start()
1604 with open(test.support.TESTFN, "wb") as f:
1605 fd = f.fileno()
1606 for newfd in range(256, MAXFD):
1607 if not self._is_fd_assigned(newfd):
1608 break
1609 else:
1610 self.fail("could not find an unassigned large file descriptor")
1611 os.dup2(fd, newfd)
1612 try:
1613 reduction.send_handle(conn, newfd, p.pid)
1614 finally:
1615 os.close(newfd)
1616 p.join()
1617 with open(test.support.TESTFN, "rb") as f:
1618 self.assertEqual(f.read(), b"bar")
1619
1620
Benjamin Petersone711caf2008-06-11 16:44:04 +00001621class _TestListenerClient(BaseTestCase):
1622
1623 ALLOWED_TYPES = ('processes', 'threads')
1624
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001625 @classmethod
1626 def _test(cls, address):
1627 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001628 conn.send('hello')
1629 conn.close()
1630
1631 def test_listener_client(self):
1632 for family in self.connection.families:
1633 l = self.connection.Listener(family=family)
1634 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001635 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001636 p.start()
1637 conn = l.accept()
1638 self.assertEqual(conn.recv(), 'hello')
1639 p.join()
1640 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001641#
1642# Test of sending connection and socket objects between processes
1643#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001644"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001645class _TestPicklingConnections(BaseTestCase):
1646
1647 ALLOWED_TYPES = ('processes',)
1648
1649 def _listener(self, conn, families):
1650 for fam in families:
1651 l = self.connection.Listener(family=fam)
1652 conn.send(l.address)
1653 new_conn = l.accept()
1654 conn.send(new_conn)
1655
1656 if self.TYPE == 'processes':
1657 l = socket.socket()
1658 l.bind(('localhost', 0))
1659 conn.send(l.getsockname())
1660 l.listen(1)
1661 new_conn, addr = l.accept()
1662 conn.send(new_conn)
1663
1664 conn.recv()
1665
1666 def _remote(self, conn):
1667 for (address, msg) in iter(conn.recv, None):
1668 client = self.connection.Client(address)
1669 client.send(msg.upper())
1670 client.close()
1671
1672 if self.TYPE == 'processes':
1673 address, msg = conn.recv()
1674 client = socket.socket()
1675 client.connect(address)
1676 client.sendall(msg.upper())
1677 client.close()
1678
1679 conn.close()
1680
1681 def test_pickling(self):
1682 try:
1683 multiprocessing.allow_connection_pickling()
1684 except ImportError:
1685 return
1686
1687 families = self.connection.families
1688
1689 lconn, lconn0 = self.Pipe()
1690 lp = self.Process(target=self._listener, args=(lconn0, families))
1691 lp.start()
1692 lconn0.close()
1693
1694 rconn, rconn0 = self.Pipe()
1695 rp = self.Process(target=self._remote, args=(rconn0,))
1696 rp.start()
1697 rconn0.close()
1698
1699 for fam in families:
1700 msg = ('This connection uses family %s' % fam).encode('ascii')
1701 address = lconn.recv()
1702 rconn.send((address, msg))
1703 new_conn = lconn.recv()
1704 self.assertEqual(new_conn.recv(), msg.upper())
1705
1706 rconn.send(None)
1707
1708 if self.TYPE == 'processes':
1709 msg = latin('This connection uses a normal socket')
1710 address = lconn.recv()
1711 rconn.send((address, msg))
1712 if hasattr(socket, 'fromfd'):
1713 new_conn = lconn.recv()
1714 self.assertEqual(new_conn.recv(100), msg.upper())
1715 else:
1716 # XXX On Windows with Py2.6 need to backport fromfd()
1717 discard = lconn.recv_bytes()
1718
1719 lconn.send(None)
1720
1721 rconn.close()
1722 lconn.close()
1723
1724 lp.join()
1725 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001726"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001727#
1728#
1729#
1730
1731class _TestHeap(BaseTestCase):
1732
1733 ALLOWED_TYPES = ('processes',)
1734
1735 def test_heap(self):
1736 iterations = 5000
1737 maxblocks = 50
1738 blocks = []
1739
1740 # create and destroy lots of blocks of different sizes
1741 for i in range(iterations):
1742 size = int(random.lognormvariate(0, 1) * 1000)
1743 b = multiprocessing.heap.BufferWrapper(size)
1744 blocks.append(b)
1745 if len(blocks) > maxblocks:
1746 i = random.randrange(maxblocks)
1747 del blocks[i]
1748
1749 # get the heap object
1750 heap = multiprocessing.heap.BufferWrapper._heap
1751
1752 # verify the state of the heap
1753 all = []
1754 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02001755 heap._lock.acquire()
1756 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001757 for L in list(heap._len_to_seq.values()):
1758 for arena, start, stop in L:
1759 all.append((heap._arenas.index(arena), start, stop,
1760 stop-start, 'free'))
1761 for arena, start, stop in heap._allocated_blocks:
1762 all.append((heap._arenas.index(arena), start, stop,
1763 stop-start, 'occupied'))
1764 occupied += (stop-start)
1765
1766 all.sort()
1767
1768 for i in range(len(all)-1):
1769 (arena, start, stop) = all[i][:3]
1770 (narena, nstart, nstop) = all[i+1][:3]
1771 self.assertTrue((arena != narena and nstart == 0) or
1772 (stop == nstart))
1773
Charles-François Natali778db492011-07-02 14:35:49 +02001774 def test_free_from_gc(self):
1775 # Check that freeing of blocks by the garbage collector doesn't deadlock
1776 # (issue #12352).
1777 # Make sure the GC is enabled, and set lower collection thresholds to
1778 # make collections more frequent (and increase the probability of
1779 # deadlock).
1780 if not gc.isenabled():
1781 gc.enable()
1782 self.addCleanup(gc.disable)
1783 thresholds = gc.get_threshold()
1784 self.addCleanup(gc.set_threshold, *thresholds)
1785 gc.set_threshold(10)
1786
1787 # perform numerous block allocations, with cyclic references to make
1788 # sure objects are collected asynchronously by the gc
1789 for i in range(5000):
1790 a = multiprocessing.heap.BufferWrapper(1)
1791 b = multiprocessing.heap.BufferWrapper(1)
1792 # circular references
1793 a.buddy = b
1794 b.buddy = a
1795
Benjamin Petersone711caf2008-06-11 16:44:04 +00001796#
1797#
1798#
1799
Benjamin Petersone711caf2008-06-11 16:44:04 +00001800class _Foo(Structure):
1801 _fields_ = [
1802 ('x', c_int),
1803 ('y', c_double)
1804 ]
1805
1806class _TestSharedCTypes(BaseTestCase):
1807
1808 ALLOWED_TYPES = ('processes',)
1809
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001810 def setUp(self):
1811 if not HAS_SHAREDCTYPES:
1812 self.skipTest("requires multiprocessing.sharedctypes")
1813
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001814 @classmethod
1815 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001816 x.value *= 2
1817 y.value *= 2
1818 foo.x *= 2
1819 foo.y *= 2
1820 string.value *= 2
1821 for i in range(len(arr)):
1822 arr[i] *= 2
1823
1824 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001825 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001826 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001827 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001828 arr = self.Array('d', list(range(10)), lock=lock)
1829 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001830 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001831
1832 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1833 p.start()
1834 p.join()
1835
1836 self.assertEqual(x.value, 14)
1837 self.assertAlmostEqual(y.value, 2.0/3.0)
1838 self.assertEqual(foo.x, 6)
1839 self.assertAlmostEqual(foo.y, 4.0)
1840 for i in range(10):
1841 self.assertAlmostEqual(arr[i], i*2)
1842 self.assertEqual(string.value, latin('hellohello'))
1843
1844 def test_synchronize(self):
1845 self.test_sharedctypes(lock=True)
1846
1847 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001848 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001849 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001850 foo.x = 0
1851 foo.y = 0
1852 self.assertEqual(bar.x, 2)
1853 self.assertAlmostEqual(bar.y, 5.0)
1854
1855#
1856#
1857#
1858
1859class _TestFinalize(BaseTestCase):
1860
1861 ALLOWED_TYPES = ('processes',)
1862
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001863 @classmethod
1864 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001865 class Foo(object):
1866 pass
1867
1868 a = Foo()
1869 util.Finalize(a, conn.send, args=('a',))
1870 del a # triggers callback for a
1871
1872 b = Foo()
1873 close_b = util.Finalize(b, conn.send, args=('b',))
1874 close_b() # triggers callback for b
1875 close_b() # does nothing because callback has already been called
1876 del b # does nothing because callback has already been called
1877
1878 c = Foo()
1879 util.Finalize(c, conn.send, args=('c',))
1880
1881 d10 = Foo()
1882 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1883
1884 d01 = Foo()
1885 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1886 d02 = Foo()
1887 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1888 d03 = Foo()
1889 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1890
1891 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1892
1893 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1894
Ezio Melotti13925002011-03-16 11:05:33 +02001895 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00001896 # garbage collecting locals
1897 util._exit_function()
1898 conn.close()
1899 os._exit(0)
1900
1901 def test_finalize(self):
1902 conn, child_conn = self.Pipe()
1903
1904 p = self.Process(target=self._test_finalize, args=(child_conn,))
1905 p.start()
1906 p.join()
1907
1908 result = [obj for obj in iter(conn.recv, 'STOP')]
1909 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1910
1911#
1912# Test that from ... import * works for each module
1913#
1914
1915class _TestImportStar(BaseTestCase):
1916
1917 ALLOWED_TYPES = ('processes',)
1918
1919 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001920 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001921 'multiprocessing', 'multiprocessing.connection',
1922 'multiprocessing.heap', 'multiprocessing.managers',
1923 'multiprocessing.pool', 'multiprocessing.process',
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001924 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001925 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001926 ]
1927
1928 if c_int is not None:
1929 # This module requires _ctypes
1930 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001931
1932 for name in modules:
1933 __import__(name)
1934 mod = sys.modules[name]
1935
1936 for attr in getattr(mod, '__all__', ()):
1937 self.assertTrue(
1938 hasattr(mod, attr),
1939 '%r does not have attribute %r' % (mod, attr)
1940 )
1941
1942#
1943# Quick test that logging works -- does not test logging output
1944#
1945
1946class _TestLogging(BaseTestCase):
1947
1948 ALLOWED_TYPES = ('processes',)
1949
1950 def test_enable_logging(self):
1951 logger = multiprocessing.get_logger()
1952 logger.setLevel(util.SUBWARNING)
1953 self.assertTrue(logger is not None)
1954 logger.debug('this will not be printed')
1955 logger.info('nor will this')
1956 logger.setLevel(LOG_LEVEL)
1957
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001958 @classmethod
1959 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001960 logger = multiprocessing.get_logger()
1961 conn.send(logger.getEffectiveLevel())
1962
1963 def test_level(self):
1964 LEVEL1 = 32
1965 LEVEL2 = 37
1966
1967 logger = multiprocessing.get_logger()
1968 root_logger = logging.getLogger()
1969 root_level = root_logger.level
1970
1971 reader, writer = multiprocessing.Pipe(duplex=False)
1972
1973 logger.setLevel(LEVEL1)
1974 self.Process(target=self._test_level, args=(writer,)).start()
1975 self.assertEqual(LEVEL1, reader.recv())
1976
1977 logger.setLevel(logging.NOTSET)
1978 root_logger.setLevel(LEVEL2)
1979 self.Process(target=self._test_level, args=(writer,)).start()
1980 self.assertEqual(LEVEL2, reader.recv())
1981
1982 root_logger.setLevel(root_level)
1983 logger.setLevel(level=LOG_LEVEL)
1984
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001985
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00001986# class _TestLoggingProcessName(BaseTestCase):
1987#
1988# def handle(self, record):
1989# assert record.processName == multiprocessing.current_process().name
1990# self.__handled = True
1991#
1992# def test_logging(self):
1993# handler = logging.Handler()
1994# handler.handle = self.handle
1995# self.__handled = False
1996# # Bypass getLogger() and side-effects
1997# logger = logging.getLoggerClass()(
1998# 'multiprocessing.test.TestLoggingProcessName')
1999# logger.addHandler(handler)
2000# logger.propagate = False
2001#
2002# logger.warn('foo')
2003# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002004
Benjamin Petersone711caf2008-06-11 16:44:04 +00002005#
Jesse Noller6214edd2009-01-19 16:23:53 +00002006# Test to verify handle verification, see issue 3321
2007#
2008
2009class TestInvalidHandle(unittest.TestCase):
2010
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002011 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002012 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00002013 conn = _multiprocessing.Connection(44977608)
2014 self.assertRaises(IOError, conn.poll)
2015 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002016
Jesse Noller6214edd2009-01-19 16:23:53 +00002017#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002018# Functions used to create test cases from the base ones in this module
2019#
2020
2021def get_attributes(Source, names):
2022 d = {}
2023 for name in names:
2024 obj = getattr(Source, name)
2025 if type(obj) == type(get_attributes):
2026 obj = staticmethod(obj)
2027 d[name] = obj
2028 return d
2029
2030def create_test_cases(Mixin, type):
2031 result = {}
2032 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002033 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002034
2035 for name in list(glob.keys()):
2036 if name.startswith('_Test'):
2037 base = glob[name]
2038 if type in base.ALLOWED_TYPES:
2039 newname = 'With' + Type + name[1:]
2040 class Temp(base, unittest.TestCase, Mixin):
2041 pass
2042 result[newname] = Temp
2043 Temp.__name__ = newname
2044 Temp.__module__ = Mixin.__module__
2045 return result
2046
2047#
2048# Create test cases
2049#
2050
2051class ProcessesMixin(object):
2052 TYPE = 'processes'
2053 Process = multiprocessing.Process
2054 locals().update(get_attributes(multiprocessing, (
2055 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2056 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2057 'RawArray', 'current_process', 'active_children', 'Pipe',
2058 'connection', 'JoinableQueue'
2059 )))
2060
2061testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2062globals().update(testcases_processes)
2063
2064
2065class ManagerMixin(object):
2066 TYPE = 'manager'
2067 Process = multiprocessing.Process
2068 manager = object.__new__(multiprocessing.managers.SyncManager)
2069 locals().update(get_attributes(manager, (
2070 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2071 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2072 'Namespace', 'JoinableQueue'
2073 )))
2074
2075testcases_manager = create_test_cases(ManagerMixin, type='manager')
2076globals().update(testcases_manager)
2077
2078
2079class ThreadsMixin(object):
2080 TYPE = 'threads'
2081 Process = multiprocessing.dummy.Process
2082 locals().update(get_attributes(multiprocessing.dummy, (
2083 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2084 'Condition', 'Event', 'Value', 'Array', 'current_process',
2085 'active_children', 'Pipe', 'connection', 'dict', 'list',
2086 'Namespace', 'JoinableQueue'
2087 )))
2088
2089testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2090globals().update(testcases_threads)
2091
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002092class OtherTest(unittest.TestCase):
2093 # TODO: add more tests for deliver/answer challenge.
2094 def test_deliver_challenge_auth_failure(self):
2095 class _FakeConnection(object):
2096 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002097 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002098 def send_bytes(self, data):
2099 pass
2100 self.assertRaises(multiprocessing.AuthenticationError,
2101 multiprocessing.connection.deliver_challenge,
2102 _FakeConnection(), b'abc')
2103
2104 def test_answer_challenge_auth_failure(self):
2105 class _FakeConnection(object):
2106 def __init__(self):
2107 self.count = 0
2108 def recv_bytes(self, size):
2109 self.count += 1
2110 if self.count == 1:
2111 return multiprocessing.connection.CHALLENGE
2112 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002113 return b'something bogus'
2114 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002115 def send_bytes(self, data):
2116 pass
2117 self.assertRaises(multiprocessing.AuthenticationError,
2118 multiprocessing.connection.answer_challenge,
2119 _FakeConnection(), b'abc')
2120
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002121#
2122# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2123#
2124
2125def initializer(ns):
2126 ns.test += 1
2127
2128class TestInitializers(unittest.TestCase):
2129 def setUp(self):
2130 self.mgr = multiprocessing.Manager()
2131 self.ns = self.mgr.Namespace()
2132 self.ns.test = 0
2133
2134 def tearDown(self):
2135 self.mgr.shutdown()
2136
2137 def test_manager_initializer(self):
2138 m = multiprocessing.managers.SyncManager()
2139 self.assertRaises(TypeError, m.start, 1)
2140 m.start(initializer, (self.ns,))
2141 self.assertEqual(self.ns.test, 1)
2142 m.shutdown()
2143
2144 def test_pool_initializer(self):
2145 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2146 p = multiprocessing.Pool(1, initializer, (self.ns,))
2147 p.close()
2148 p.join()
2149 self.assertEqual(self.ns.test, 1)
2150
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002151#
2152# Issue 5155, 5313, 5331: Test process in processes
2153# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2154#
2155
2156def _ThisSubProcess(q):
2157 try:
2158 item = q.get(block=False)
2159 except pyqueue.Empty:
2160 pass
2161
2162def _TestProcess(q):
2163 queue = multiprocessing.Queue()
2164 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
2165 subProc.start()
2166 subProc.join()
2167
2168def _afunc(x):
2169 return x*x
2170
2171def pool_in_process():
2172 pool = multiprocessing.Pool(processes=4)
2173 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2174
2175class _file_like(object):
2176 def __init__(self, delegate):
2177 self._delegate = delegate
2178 self._pid = None
2179
2180 @property
2181 def cache(self):
2182 pid = os.getpid()
2183 # There are no race conditions since fork keeps only the running thread
2184 if pid != self._pid:
2185 self._pid = pid
2186 self._cache = []
2187 return self._cache
2188
2189 def write(self, data):
2190 self.cache.append(data)
2191
2192 def flush(self):
2193 self._delegate.write(''.join(self.cache))
2194 self._cache = []
2195
2196class TestStdinBadfiledescriptor(unittest.TestCase):
2197
2198 def test_queue_in_process(self):
2199 queue = multiprocessing.Queue()
2200 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2201 proc.start()
2202 proc.join()
2203
2204 def test_pool_in_process(self):
2205 p = multiprocessing.Process(target=pool_in_process)
2206 p.start()
2207 p.join()
2208
2209 def test_flushing(self):
2210 sio = io.StringIO()
2211 flike = _file_like(sio)
2212 flike.write('foo')
2213 proc = multiprocessing.Process(target=lambda: flike.flush())
2214 flike.flush()
2215 assert sio.getvalue() == 'foo'
2216
2217testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2218 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002219
Benjamin Petersone711caf2008-06-11 16:44:04 +00002220#
2221#
2222#
2223
2224def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002225 if sys.platform.startswith("linux"):
2226 try:
2227 lock = multiprocessing.RLock()
2228 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002229 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002230
Benjamin Petersone711caf2008-06-11 16:44:04 +00002231 if run is None:
2232 from test.support import run_unittest as run
2233
2234 util.get_temp_dir() # creates temp directory for use by all processes
2235
2236 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2237
Benjamin Peterson41181742008-07-02 20:22:54 +00002238 ProcessesMixin.pool = multiprocessing.Pool(4)
2239 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2240 ManagerMixin.manager.__init__()
2241 ManagerMixin.manager.start()
2242 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002243
2244 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002245 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2246 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002247 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2248 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002249 )
2250
2251 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2252 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2253 run(suite)
2254
Benjamin Peterson41181742008-07-02 20:22:54 +00002255 ThreadsMixin.pool.terminate()
2256 ProcessesMixin.pool.terminate()
2257 ManagerMixin.pool.terminate()
2258 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002259
Benjamin Peterson41181742008-07-02 20:22:54 +00002260 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002261
2262def main():
2263 test_main(unittest.TextTestRunner(verbosity=2).run)
2264
2265if __name__ == '__main__':
2266 main()