blob: 0c05ff63a1b1c143ffbb7500c3c5ad6755283e5f [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000027# import threading after _multiprocessing to raise a more revelant error
28# message: "No module named _multiprocessing". _multiprocessing is not compiled
29# without thread support.
30import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.dummy
33import multiprocessing.connection
34import multiprocessing.managers
35import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
38from multiprocessing import util
39
Brian Curtinafa88b52010-10-07 01:12:19 +000040try:
41 from multiprocessing.sharedctypes import Value, copy
42 HAS_SHAREDCTYPES = True
43except ImportError:
44 HAS_SHAREDCTYPES = False
45
Benjamin Petersone711caf2008-06-11 16:44:04 +000046#
47#
48#
49
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000050def latin(s):
51 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000052
Benjamin Petersone711caf2008-06-11 16:44:04 +000053#
54# Constants
55#
56
57LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000058#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000059
60DELTA = 0.1
61CHECK_TIMINGS = False # making true makes tests take a lot longer
62 # and can sometimes cause some non-serious
63 # failures because some calls block a bit
64 # longer than expected
65if CHECK_TIMINGS:
66 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
67else:
68 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
69
70HAVE_GETVALUE = not getattr(_multiprocessing,
71 'HAVE_BROKEN_SEM_GETVALUE', False)
72
Jesse Noller6214edd2009-01-19 16:23:53 +000073WIN32 = (sys.platform == "win32")
74
Benjamin Petersone711caf2008-06-11 16:44:04 +000075#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000076# Some tests require ctypes
77#
78
79try:
Florent Xiclunaaa171062010-08-14 15:56:42 +000080 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +000081except ImportError:
82 Structure = object
83 c_int = c_double = None
84
85#
Benjamin Petersone711caf2008-06-11 16:44:04 +000086# Creates a wrapper for a function which records the time it takes to finish
87#
88
89class TimingWrapper(object):
90
91 def __init__(self, func):
92 self.func = func
93 self.elapsed = None
94
95 def __call__(self, *args, **kwds):
96 t = time.time()
97 try:
98 return self.func(*args, **kwds)
99 finally:
100 self.elapsed = time.time() - t
101
102#
103# Base class for test cases
104#
105
106class BaseTestCase(object):
107
108 ALLOWED_TYPES = ('processes', 'manager', 'threads')
109
110 def assertTimingAlmostEqual(self, a, b):
111 if CHECK_TIMINGS:
112 self.assertAlmostEqual(a, b, 1)
113
114 def assertReturnsIfImplemented(self, value, func, *args):
115 try:
116 res = func(*args)
117 except NotImplementedError:
118 pass
119 else:
120 return self.assertEqual(value, res)
121
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000122 # For the sanity of Windows users, rather than crashing or freezing in
123 # multiple ways.
124 def __reduce__(self, *args):
125 raise NotImplementedError("shouldn't try to pickle a test case")
126
127 __reduce_ex__ = __reduce__
128
Benjamin Petersone711caf2008-06-11 16:44:04 +0000129#
130# Return the value of a semaphore
131#
132
133def get_value(self):
134 try:
135 return self.get_value()
136 except AttributeError:
137 try:
138 return self._Semaphore__value
139 except AttributeError:
140 try:
141 return self._value
142 except AttributeError:
143 raise NotImplementedError
144
145#
146# Testcases
147#
148
149class _TestProcess(BaseTestCase):
150
151 ALLOWED_TYPES = ('processes', 'threads')
152
153 def test_current(self):
154 if self.TYPE == 'threads':
155 return
156
157 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000158 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000159
160 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000161 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000162 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000163 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000164 self.assertEqual(current.ident, os.getpid())
165 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000166
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000167 def test_daemon_argument(self):
168 if self.TYPE == "threads":
169 return
170
171 # By default uses the current process's daemon flag.
172 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000173 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000174 proc1 = self.Process(target=self._test, daemon=True)
175 self.assertTrue(proc1.daemon)
176 proc2 = self.Process(target=self._test, daemon=False)
177 self.assertFalse(proc2.daemon)
178
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000179 @classmethod
180 def _test(cls, q, *args, **kwds):
181 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000182 q.put(args)
183 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000184 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000185 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000186 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000187 q.put(current.pid)
188
189 def test_process(self):
190 q = self.Queue(1)
191 e = self.Event()
192 args = (q, 1, 2)
193 kwargs = {'hello':23, 'bye':2.54}
194 name = 'SomeProcess'
195 p = self.Process(
196 target=self._test, args=args, kwargs=kwargs, name=name
197 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000198 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000199 current = self.current_process()
200
201 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000202 self.assertEqual(p.authkey, current.authkey)
203 self.assertEqual(p.is_alive(), False)
204 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000205 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000207 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000208
209 p.start()
210
Ezio Melottib3aedd42010-11-20 19:04:17 +0000211 self.assertEqual(p.exitcode, None)
212 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000213 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000214
Ezio Melottib3aedd42010-11-20 19:04:17 +0000215 self.assertEqual(q.get(), args[1:])
216 self.assertEqual(q.get(), kwargs)
217 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000218 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000219 self.assertEqual(q.get(), current.authkey)
220 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000221
222 p.join()
223
Ezio Melottib3aedd42010-11-20 19:04:17 +0000224 self.assertEqual(p.exitcode, 0)
225 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000226 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000227
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000228 @classmethod
229 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000230 time.sleep(1000)
231
232 def test_terminate(self):
233 if self.TYPE == 'threads':
234 return
235
236 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000237 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000238 p.start()
239
240 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000241 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000242 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000243
244 p.terminate()
245
246 join = TimingWrapper(p.join)
247 self.assertEqual(join(), None)
248 self.assertTimingAlmostEqual(join.elapsed, 0.0)
249
250 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000251 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000252
253 p.join()
254
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000255 # XXX sometimes get p.exitcode == 0 on Windows ...
256 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000257
258 def test_cpu_count(self):
259 try:
260 cpus = multiprocessing.cpu_count()
261 except NotImplementedError:
262 cpus = 1
263 self.assertTrue(type(cpus) is int)
264 self.assertTrue(cpus >= 1)
265
266 def test_active_children(self):
267 self.assertEqual(type(self.active_children()), list)
268
269 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000270 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000271
272 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000273 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000274
275 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000276 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000277
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000278 @classmethod
279 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000280 from multiprocessing import forking
281 wconn.send(id)
282 if len(id) < 2:
283 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000284 p = cls.Process(
285 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000286 )
287 p.start()
288 p.join()
289
290 def test_recursion(self):
291 rconn, wconn = self.Pipe(duplex=False)
292 self._test_recursion(wconn, [])
293
294 time.sleep(DELTA)
295 result = []
296 while rconn.poll():
297 result.append(rconn.recv())
298
299 expected = [
300 [],
301 [0],
302 [0, 0],
303 [0, 1],
304 [1],
305 [1, 0],
306 [1, 1]
307 ]
308 self.assertEqual(result, expected)
309
310#
311#
312#
313
314class _UpperCaser(multiprocessing.Process):
315
316 def __init__(self):
317 multiprocessing.Process.__init__(self)
318 self.child_conn, self.parent_conn = multiprocessing.Pipe()
319
320 def run(self):
321 self.parent_conn.close()
322 for s in iter(self.child_conn.recv, None):
323 self.child_conn.send(s.upper())
324 self.child_conn.close()
325
326 def submit(self, s):
327 assert type(s) is str
328 self.parent_conn.send(s)
329 return self.parent_conn.recv()
330
331 def stop(self):
332 self.parent_conn.send(None)
333 self.parent_conn.close()
334 self.child_conn.close()
335
336class _TestSubclassingProcess(BaseTestCase):
337
338 ALLOWED_TYPES = ('processes',)
339
340 def test_subclassing(self):
341 uppercaser = _UpperCaser()
342 uppercaser.start()
343 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
344 self.assertEqual(uppercaser.submit('world'), 'WORLD')
345 uppercaser.stop()
346 uppercaser.join()
347
348#
349#
350#
351
352def queue_empty(q):
353 if hasattr(q, 'empty'):
354 return q.empty()
355 else:
356 return q.qsize() == 0
357
358def queue_full(q, maxsize):
359 if hasattr(q, 'full'):
360 return q.full()
361 else:
362 return q.qsize() == maxsize
363
364
365class _TestQueue(BaseTestCase):
366
367
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000368 @classmethod
369 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000370 child_can_start.wait()
371 for i in range(6):
372 queue.get()
373 parent_can_continue.set()
374
375 def test_put(self):
376 MAXSIZE = 6
377 queue = self.Queue(maxsize=MAXSIZE)
378 child_can_start = self.Event()
379 parent_can_continue = self.Event()
380
381 proc = self.Process(
382 target=self._test_put,
383 args=(queue, child_can_start, parent_can_continue)
384 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000385 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000386 proc.start()
387
388 self.assertEqual(queue_empty(queue), True)
389 self.assertEqual(queue_full(queue, MAXSIZE), False)
390
391 queue.put(1)
392 queue.put(2, True)
393 queue.put(3, True, None)
394 queue.put(4, False)
395 queue.put(5, False, None)
396 queue.put_nowait(6)
397
398 # the values may be in buffer but not yet in pipe so sleep a bit
399 time.sleep(DELTA)
400
401 self.assertEqual(queue_empty(queue), False)
402 self.assertEqual(queue_full(queue, MAXSIZE), True)
403
404 put = TimingWrapper(queue.put)
405 put_nowait = TimingWrapper(queue.put_nowait)
406
407 self.assertRaises(pyqueue.Full, put, 7, False)
408 self.assertTimingAlmostEqual(put.elapsed, 0)
409
410 self.assertRaises(pyqueue.Full, put, 7, False, None)
411 self.assertTimingAlmostEqual(put.elapsed, 0)
412
413 self.assertRaises(pyqueue.Full, put_nowait, 7)
414 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
415
416 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
417 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
418
419 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
420 self.assertTimingAlmostEqual(put.elapsed, 0)
421
422 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
423 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
424
425 child_can_start.set()
426 parent_can_continue.wait()
427
428 self.assertEqual(queue_empty(queue), True)
429 self.assertEqual(queue_full(queue, MAXSIZE), False)
430
431 proc.join()
432
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000433 @classmethod
434 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000435 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000436 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000437 queue.put(2)
438 queue.put(3)
439 queue.put(4)
440 queue.put(5)
441 parent_can_continue.set()
442
443 def test_get(self):
444 queue = self.Queue()
445 child_can_start = self.Event()
446 parent_can_continue = self.Event()
447
448 proc = self.Process(
449 target=self._test_get,
450 args=(queue, child_can_start, parent_can_continue)
451 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000452 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000453 proc.start()
454
455 self.assertEqual(queue_empty(queue), True)
456
457 child_can_start.set()
458 parent_can_continue.wait()
459
460 time.sleep(DELTA)
461 self.assertEqual(queue_empty(queue), False)
462
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000463 # Hangs unexpectedly, remove for now
464 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000465 self.assertEqual(queue.get(True, None), 2)
466 self.assertEqual(queue.get(True), 3)
467 self.assertEqual(queue.get(timeout=1), 4)
468 self.assertEqual(queue.get_nowait(), 5)
469
470 self.assertEqual(queue_empty(queue), True)
471
472 get = TimingWrapper(queue.get)
473 get_nowait = TimingWrapper(queue.get_nowait)
474
475 self.assertRaises(pyqueue.Empty, get, False)
476 self.assertTimingAlmostEqual(get.elapsed, 0)
477
478 self.assertRaises(pyqueue.Empty, get, False, None)
479 self.assertTimingAlmostEqual(get.elapsed, 0)
480
481 self.assertRaises(pyqueue.Empty, get_nowait)
482 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
483
484 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
485 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
486
487 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
488 self.assertTimingAlmostEqual(get.elapsed, 0)
489
490 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
491 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
492
493 proc.join()
494
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000495 @classmethod
496 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000497 for i in range(10, 20):
498 queue.put(i)
499 # note that at this point the items may only be buffered, so the
500 # process cannot shutdown until the feeder thread has finished
501 # pushing items onto the pipe.
502
503 def test_fork(self):
504 # Old versions of Queue would fail to create a new feeder
505 # thread for a forked process if the original process had its
506 # own feeder thread. This test checks that this no longer
507 # happens.
508
509 queue = self.Queue()
510
511 # put items on queue so that main process starts a feeder thread
512 for i in range(10):
513 queue.put(i)
514
515 # wait to make sure thread starts before we fork a new process
516 time.sleep(DELTA)
517
518 # fork process
519 p = self.Process(target=self._test_fork, args=(queue,))
520 p.start()
521
522 # check that all expected items are in the queue
523 for i in range(20):
524 self.assertEqual(queue.get(), i)
525 self.assertRaises(pyqueue.Empty, queue.get, False)
526
527 p.join()
528
529 def test_qsize(self):
530 q = self.Queue()
531 try:
532 self.assertEqual(q.qsize(), 0)
533 except NotImplementedError:
534 return
535 q.put(1)
536 self.assertEqual(q.qsize(), 1)
537 q.put(5)
538 self.assertEqual(q.qsize(), 2)
539 q.get()
540 self.assertEqual(q.qsize(), 1)
541 q.get()
542 self.assertEqual(q.qsize(), 0)
543
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000544 @classmethod
545 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000546 for obj in iter(q.get, None):
547 time.sleep(DELTA)
548 q.task_done()
549
550 def test_task_done(self):
551 queue = self.JoinableQueue()
552
553 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000554 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000555
556 workers = [self.Process(target=self._test_task_done, args=(queue,))
557 for i in range(4)]
558
559 for p in workers:
560 p.start()
561
562 for i in range(10):
563 queue.put(i)
564
565 queue.join()
566
567 for p in workers:
568 queue.put(None)
569
570 for p in workers:
571 p.join()
572
573#
574#
575#
576
577class _TestLock(BaseTestCase):
578
579 def test_lock(self):
580 lock = self.Lock()
581 self.assertEqual(lock.acquire(), True)
582 self.assertEqual(lock.acquire(False), False)
583 self.assertEqual(lock.release(), None)
584 self.assertRaises((ValueError, threading.ThreadError), lock.release)
585
586 def test_rlock(self):
587 lock = self.RLock()
588 self.assertEqual(lock.acquire(), True)
589 self.assertEqual(lock.acquire(), True)
590 self.assertEqual(lock.acquire(), True)
591 self.assertEqual(lock.release(), None)
592 self.assertEqual(lock.release(), None)
593 self.assertEqual(lock.release(), None)
594 self.assertRaises((AssertionError, RuntimeError), lock.release)
595
Jesse Nollerf8d00852009-03-31 03:25:07 +0000596 def test_lock_context(self):
597 with self.Lock():
598 pass
599
Benjamin Petersone711caf2008-06-11 16:44:04 +0000600
601class _TestSemaphore(BaseTestCase):
602
603 def _test_semaphore(self, sem):
604 self.assertReturnsIfImplemented(2, get_value, sem)
605 self.assertEqual(sem.acquire(), True)
606 self.assertReturnsIfImplemented(1, get_value, sem)
607 self.assertEqual(sem.acquire(), True)
608 self.assertReturnsIfImplemented(0, get_value, sem)
609 self.assertEqual(sem.acquire(False), False)
610 self.assertReturnsIfImplemented(0, get_value, sem)
611 self.assertEqual(sem.release(), None)
612 self.assertReturnsIfImplemented(1, get_value, sem)
613 self.assertEqual(sem.release(), None)
614 self.assertReturnsIfImplemented(2, get_value, sem)
615
616 def test_semaphore(self):
617 sem = self.Semaphore(2)
618 self._test_semaphore(sem)
619 self.assertEqual(sem.release(), None)
620 self.assertReturnsIfImplemented(3, get_value, sem)
621 self.assertEqual(sem.release(), None)
622 self.assertReturnsIfImplemented(4, get_value, sem)
623
624 def test_bounded_semaphore(self):
625 sem = self.BoundedSemaphore(2)
626 self._test_semaphore(sem)
627 # Currently fails on OS/X
628 #if HAVE_GETVALUE:
629 # self.assertRaises(ValueError, sem.release)
630 # self.assertReturnsIfImplemented(2, get_value, sem)
631
632 def test_timeout(self):
633 if self.TYPE != 'processes':
634 return
635
636 sem = self.Semaphore(0)
637 acquire = TimingWrapper(sem.acquire)
638
639 self.assertEqual(acquire(False), False)
640 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
641
642 self.assertEqual(acquire(False, None), False)
643 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
644
645 self.assertEqual(acquire(False, TIMEOUT1), False)
646 self.assertTimingAlmostEqual(acquire.elapsed, 0)
647
648 self.assertEqual(acquire(True, TIMEOUT2), False)
649 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
650
651 self.assertEqual(acquire(timeout=TIMEOUT3), False)
652 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
653
654
655class _TestCondition(BaseTestCase):
656
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000657 @classmethod
658 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000659 cond.acquire()
660 sleeping.release()
661 cond.wait(timeout)
662 woken.release()
663 cond.release()
664
665 def check_invariant(self, cond):
666 # this is only supposed to succeed when there are no sleepers
667 if self.TYPE == 'processes':
668 try:
669 sleepers = (cond._sleeping_count.get_value() -
670 cond._woken_count.get_value())
671 self.assertEqual(sleepers, 0)
672 self.assertEqual(cond._wait_semaphore.get_value(), 0)
673 except NotImplementedError:
674 pass
675
676 def test_notify(self):
677 cond = self.Condition()
678 sleeping = self.Semaphore(0)
679 woken = self.Semaphore(0)
680
681 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000682 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000683 p.start()
684
685 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000686 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000687 p.start()
688
689 # wait for both children to start sleeping
690 sleeping.acquire()
691 sleeping.acquire()
692
693 # check no process/thread has woken up
694 time.sleep(DELTA)
695 self.assertReturnsIfImplemented(0, get_value, woken)
696
697 # wake up one process/thread
698 cond.acquire()
699 cond.notify()
700 cond.release()
701
702 # check one process/thread has woken up
703 time.sleep(DELTA)
704 self.assertReturnsIfImplemented(1, get_value, woken)
705
706 # wake up another
707 cond.acquire()
708 cond.notify()
709 cond.release()
710
711 # check other has woken up
712 time.sleep(DELTA)
713 self.assertReturnsIfImplemented(2, get_value, woken)
714
715 # check state is not mucked up
716 self.check_invariant(cond)
717 p.join()
718
719 def test_notify_all(self):
720 cond = self.Condition()
721 sleeping = self.Semaphore(0)
722 woken = self.Semaphore(0)
723
724 # start some threads/processes which will timeout
725 for i in range(3):
726 p = self.Process(target=self.f,
727 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000728 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000729 p.start()
730
731 t = threading.Thread(target=self.f,
732 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000733 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000734 t.start()
735
736 # wait for them all to sleep
737 for i in range(6):
738 sleeping.acquire()
739
740 # check they have all timed out
741 for i in range(6):
742 woken.acquire()
743 self.assertReturnsIfImplemented(0, get_value, woken)
744
745 # check state is not mucked up
746 self.check_invariant(cond)
747
748 # start some more threads/processes
749 for i in range(3):
750 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000751 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000752 p.start()
753
754 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000755 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000756 t.start()
757
758 # wait for them to all sleep
759 for i in range(6):
760 sleeping.acquire()
761
762 # check no process/thread has woken up
763 time.sleep(DELTA)
764 self.assertReturnsIfImplemented(0, get_value, woken)
765
766 # wake them all up
767 cond.acquire()
768 cond.notify_all()
769 cond.release()
770
771 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200772 for i in range(10):
773 try:
774 if get_value(woken) == 6:
775 break
776 except NotImplementedError:
777 break
778 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000779 self.assertReturnsIfImplemented(6, get_value, woken)
780
781 # check state is not mucked up
782 self.check_invariant(cond)
783
784 def test_timeout(self):
785 cond = self.Condition()
786 wait = TimingWrapper(cond.wait)
787 cond.acquire()
788 res = wait(TIMEOUT1)
789 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000790 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000791 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
792
793
794class _TestEvent(BaseTestCase):
795
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000796 @classmethod
797 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000798 time.sleep(TIMEOUT2)
799 event.set()
800
801 def test_event(self):
802 event = self.Event()
803 wait = TimingWrapper(event.wait)
804
Ezio Melotti13925002011-03-16 11:05:33 +0200805 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000806 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000807 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000808
Benjamin Peterson965ce872009-04-05 21:24:58 +0000809 # Removed, threading.Event.wait() will return the value of the __flag
810 # instead of None. API Shear with the semaphore backed mp.Event
811 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000812 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000813 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000814 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
815
816 event.set()
817
818 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000819 self.assertEqual(event.is_set(), True)
820 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000821 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000822 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000823 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
824 # self.assertEqual(event.is_set(), True)
825
826 event.clear()
827
828 #self.assertEqual(event.is_set(), False)
829
830 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000831 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000832
833#
834#
835#
836
837class _TestValue(BaseTestCase):
838
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000839 ALLOWED_TYPES = ('processes',)
840
Benjamin Petersone711caf2008-06-11 16:44:04 +0000841 codes_values = [
842 ('i', 4343, 24234),
843 ('d', 3.625, -4.25),
844 ('h', -232, 234),
845 ('c', latin('x'), latin('y'))
846 ]
847
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000848 def setUp(self):
849 if not HAS_SHAREDCTYPES:
850 self.skipTest("requires multiprocessing.sharedctypes")
851
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000852 @classmethod
853 def _test(cls, values):
854 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000855 sv.value = cv[2]
856
857
858 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000859 if raw:
860 values = [self.RawValue(code, value)
861 for code, value, _ in self.codes_values]
862 else:
863 values = [self.Value(code, value)
864 for code, value, _ in self.codes_values]
865
866 for sv, cv in zip(values, self.codes_values):
867 self.assertEqual(sv.value, cv[1])
868
869 proc = self.Process(target=self._test, args=(values,))
870 proc.start()
871 proc.join()
872
873 for sv, cv in zip(values, self.codes_values):
874 self.assertEqual(sv.value, cv[2])
875
876 def test_rawvalue(self):
877 self.test_value(raw=True)
878
879 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000880 val1 = self.Value('i', 5)
881 lock1 = val1.get_lock()
882 obj1 = val1.get_obj()
883
884 val2 = self.Value('i', 5, lock=None)
885 lock2 = val2.get_lock()
886 obj2 = val2.get_obj()
887
888 lock = self.Lock()
889 val3 = self.Value('i', 5, lock=lock)
890 lock3 = val3.get_lock()
891 obj3 = val3.get_obj()
892 self.assertEqual(lock, lock3)
893
Jesse Nollerb0516a62009-01-18 03:11:38 +0000894 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000895 self.assertFalse(hasattr(arr4, 'get_lock'))
896 self.assertFalse(hasattr(arr4, 'get_obj'))
897
Jesse Nollerb0516a62009-01-18 03:11:38 +0000898 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
899
900 arr5 = self.RawValue('i', 5)
901 self.assertFalse(hasattr(arr5, 'get_lock'))
902 self.assertFalse(hasattr(arr5, 'get_obj'))
903
Benjamin Petersone711caf2008-06-11 16:44:04 +0000904
905class _TestArray(BaseTestCase):
906
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000907 ALLOWED_TYPES = ('processes',)
908
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000909 @classmethod
910 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000911 for i in range(1, len(seq)):
912 seq[i] += seq[i-1]
913
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000914 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000915 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000916 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
917 if raw:
918 arr = self.RawArray('i', seq)
919 else:
920 arr = self.Array('i', seq)
921
922 self.assertEqual(len(arr), len(seq))
923 self.assertEqual(arr[3], seq[3])
924 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
925
926 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
927
928 self.assertEqual(list(arr[:]), seq)
929
930 self.f(seq)
931
932 p = self.Process(target=self.f, args=(arr,))
933 p.start()
934 p.join()
935
936 self.assertEqual(list(arr[:]), seq)
937
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000938 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +0000939 def test_array_from_size(self):
940 size = 10
941 # Test for zeroing (see issue #11675).
942 # The repetition below strengthens the test by increasing the chances
943 # of previously allocated non-zero memory being used for the new array
944 # on the 2nd and 3rd loops.
945 for _ in range(3):
946 arr = self.Array('i', size)
947 self.assertEqual(len(arr), size)
948 self.assertEqual(list(arr), [0] * size)
949 arr[:] = range(10)
950 self.assertEqual(list(arr), list(range(10)))
951 del arr
952
953 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000954 def test_rawarray(self):
955 self.test_array(raw=True)
956
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000957 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000958 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000959 arr1 = self.Array('i', list(range(10)))
960 lock1 = arr1.get_lock()
961 obj1 = arr1.get_obj()
962
963 arr2 = self.Array('i', list(range(10)), lock=None)
964 lock2 = arr2.get_lock()
965 obj2 = arr2.get_obj()
966
967 lock = self.Lock()
968 arr3 = self.Array('i', list(range(10)), lock=lock)
969 lock3 = arr3.get_lock()
970 obj3 = arr3.get_obj()
971 self.assertEqual(lock, lock3)
972
Jesse Nollerb0516a62009-01-18 03:11:38 +0000973 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000974 self.assertFalse(hasattr(arr4, 'get_lock'))
975 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000976 self.assertRaises(AttributeError,
977 self.Array, 'i', range(10), lock='notalock')
978
979 arr5 = self.RawArray('i', range(10))
980 self.assertFalse(hasattr(arr5, 'get_lock'))
981 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000982
983#
984#
985#
986
987class _TestContainers(BaseTestCase):
988
989 ALLOWED_TYPES = ('manager',)
990
991 def test_list(self):
992 a = self.list(list(range(10)))
993 self.assertEqual(a[:], list(range(10)))
994
995 b = self.list()
996 self.assertEqual(b[:], [])
997
998 b.extend(list(range(5)))
999 self.assertEqual(b[:], list(range(5)))
1000
1001 self.assertEqual(b[2], 2)
1002 self.assertEqual(b[2:10], [2,3,4])
1003
1004 b *= 2
1005 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1006
1007 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1008
1009 self.assertEqual(a[:], list(range(10)))
1010
1011 d = [a, b]
1012 e = self.list(d)
1013 self.assertEqual(
1014 e[:],
1015 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1016 )
1017
1018 f = self.list([a])
1019 a.append('hello')
1020 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1021
1022 def test_dict(self):
1023 d = self.dict()
1024 indices = list(range(65, 70))
1025 for i in indices:
1026 d[i] = chr(i)
1027 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1028 self.assertEqual(sorted(d.keys()), indices)
1029 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1030 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1031
1032 def test_namespace(self):
1033 n = self.Namespace()
1034 n.name = 'Bob'
1035 n.job = 'Builder'
1036 n._hidden = 'hidden'
1037 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1038 del n.job
1039 self.assertEqual(str(n), "Namespace(name='Bob')")
1040 self.assertTrue(hasattr(n, 'name'))
1041 self.assertTrue(not hasattr(n, 'job'))
1042
1043#
1044#
1045#
1046
1047def sqr(x, wait=0.0):
1048 time.sleep(wait)
1049 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001050
Benjamin Petersone711caf2008-06-11 16:44:04 +00001051class _TestPool(BaseTestCase):
1052
1053 def test_apply(self):
1054 papply = self.pool.apply
1055 self.assertEqual(papply(sqr, (5,)), sqr(5))
1056 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1057
1058 def test_map(self):
1059 pmap = self.pool.map
1060 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1061 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1062 list(map(sqr, list(range(100)))))
1063
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001064 def test_map_chunksize(self):
1065 try:
1066 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1067 except multiprocessing.TimeoutError:
1068 self.fail("pool.map_async with chunksize stalled on null list")
1069
Benjamin Petersone711caf2008-06-11 16:44:04 +00001070 def test_async(self):
1071 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1072 get = TimingWrapper(res.get)
1073 self.assertEqual(get(), 49)
1074 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1075
1076 def test_async_timeout(self):
1077 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1078 get = TimingWrapper(res.get)
1079 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1080 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1081
1082 def test_imap(self):
1083 it = self.pool.imap(sqr, list(range(10)))
1084 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1085
1086 it = self.pool.imap(sqr, list(range(10)))
1087 for i in range(10):
1088 self.assertEqual(next(it), i*i)
1089 self.assertRaises(StopIteration, it.__next__)
1090
1091 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1092 for i in range(1000):
1093 self.assertEqual(next(it), i*i)
1094 self.assertRaises(StopIteration, it.__next__)
1095
1096 def test_imap_unordered(self):
1097 it = self.pool.imap_unordered(sqr, list(range(1000)))
1098 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1099
1100 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1101 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1102
1103 def test_make_pool(self):
1104 p = multiprocessing.Pool(3)
1105 self.assertEqual(3, len(p._pool))
1106 p.close()
1107 p.join()
1108
1109 def test_terminate(self):
1110 if self.TYPE == 'manager':
1111 # On Unix a forked process increfs each shared object to
1112 # which its parent process held a reference. If the
1113 # forked process gets terminated then there is likely to
1114 # be a reference leak. So to prevent
1115 # _TestZZZNumberOfObjects from failing we skip this test
1116 # when using a manager.
1117 return
1118
1119 result = self.pool.map_async(
1120 time.sleep, [0.1 for i in range(10000)], chunksize=1
1121 )
1122 self.pool.terminate()
1123 join = TimingWrapper(self.pool.join)
1124 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001125 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001126
Ask Solem2afcbf22010-11-09 20:55:52 +00001127def raising():
1128 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001129
Ask Solem2afcbf22010-11-09 20:55:52 +00001130def unpickleable_result():
1131 return lambda: 42
1132
1133class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001134 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001135
1136 def test_async_error_callback(self):
1137 p = multiprocessing.Pool(2)
1138
1139 scratchpad = [None]
1140 def errback(exc):
1141 scratchpad[0] = exc
1142
1143 res = p.apply_async(raising, error_callback=errback)
1144 self.assertRaises(KeyError, res.get)
1145 self.assertTrue(scratchpad[0])
1146 self.assertIsInstance(scratchpad[0], KeyError)
1147
1148 p.close()
1149 p.join()
1150
1151 def test_unpickleable_result(self):
1152 from multiprocessing.pool import MaybeEncodingError
1153 p = multiprocessing.Pool(2)
1154
1155 # Make sure we don't lose pool processes because of encoding errors.
1156 for iteration in range(20):
1157
1158 scratchpad = [None]
1159 def errback(exc):
1160 scratchpad[0] = exc
1161
1162 res = p.apply_async(unpickleable_result, error_callback=errback)
1163 self.assertRaises(MaybeEncodingError, res.get)
1164 wrapped = scratchpad[0]
1165 self.assertTrue(wrapped)
1166 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1167 self.assertIsNotNone(wrapped.exc)
1168 self.assertIsNotNone(wrapped.value)
1169
1170 p.close()
1171 p.join()
1172
1173class _TestPoolWorkerLifetime(BaseTestCase):
1174 ALLOWED_TYPES = ('processes', )
1175
Jesse Noller1f0b6582010-01-27 03:36:01 +00001176 def test_pool_worker_lifetime(self):
1177 p = multiprocessing.Pool(3, maxtasksperchild=10)
1178 self.assertEqual(3, len(p._pool))
1179 origworkerpids = [w.pid for w in p._pool]
1180 # Run many tasks so each worker gets replaced (hopefully)
1181 results = []
1182 for i in range(100):
1183 results.append(p.apply_async(sqr, (i, )))
1184 # Fetch the results and verify we got the right answers,
1185 # also ensuring all the tasks have completed.
1186 for (j, res) in enumerate(results):
1187 self.assertEqual(res.get(), sqr(j))
1188 # Refill the pool
1189 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001190 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001191 # (countdown * DELTA = 5 seconds max startup process time)
1192 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001193 while countdown and not all(w.is_alive() for w in p._pool):
1194 countdown -= 1
1195 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001196 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001197 # All pids should be assigned. See issue #7805.
1198 self.assertNotIn(None, origworkerpids)
1199 self.assertNotIn(None, finalworkerpids)
1200 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001201 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1202 p.close()
1203 p.join()
1204
Benjamin Petersone711caf2008-06-11 16:44:04 +00001205#
1206# Test that manager has expected number of shared objects left
1207#
1208
1209class _TestZZZNumberOfObjects(BaseTestCase):
1210 # Because test cases are sorted alphabetically, this one will get
1211 # run after all the other tests for the manager. It tests that
1212 # there have been no "reference leaks" for the manager's shared
1213 # objects. Note the comment in _TestPool.test_terminate().
1214 ALLOWED_TYPES = ('manager',)
1215
1216 def test_number_of_objects(self):
1217 EXPECTED_NUMBER = 1 # the pool object is still alive
1218 multiprocessing.active_children() # discard dead process objs
1219 gc.collect() # do garbage collection
1220 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001221 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001222 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001223 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001224 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001225
1226 self.assertEqual(refs, EXPECTED_NUMBER)
1227
1228#
1229# Test of creating a customized manager class
1230#
1231
1232from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1233
1234class FooBar(object):
1235 def f(self):
1236 return 'f()'
1237 def g(self):
1238 raise ValueError
1239 def _h(self):
1240 return '_h()'
1241
1242def baz():
1243 for i in range(10):
1244 yield i*i
1245
1246class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001247 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001248 def __iter__(self):
1249 return self
1250 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001251 return self._callmethod('__next__')
1252
1253class MyManager(BaseManager):
1254 pass
1255
1256MyManager.register('Foo', callable=FooBar)
1257MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1258MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1259
1260
1261class _TestMyManager(BaseTestCase):
1262
1263 ALLOWED_TYPES = ('manager',)
1264
1265 def test_mymanager(self):
1266 manager = MyManager()
1267 manager.start()
1268
1269 foo = manager.Foo()
1270 bar = manager.Bar()
1271 baz = manager.baz()
1272
1273 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1274 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1275
1276 self.assertEqual(foo_methods, ['f', 'g'])
1277 self.assertEqual(bar_methods, ['f', '_h'])
1278
1279 self.assertEqual(foo.f(), 'f()')
1280 self.assertRaises(ValueError, foo.g)
1281 self.assertEqual(foo._callmethod('f'), 'f()')
1282 self.assertRaises(RemoteError, foo._callmethod, '_h')
1283
1284 self.assertEqual(bar.f(), 'f()')
1285 self.assertEqual(bar._h(), '_h()')
1286 self.assertEqual(bar._callmethod('f'), 'f()')
1287 self.assertEqual(bar._callmethod('_h'), '_h()')
1288
1289 self.assertEqual(list(baz), [i*i for i in range(10)])
1290
1291 manager.shutdown()
1292
1293#
1294# Test of connecting to a remote server and using xmlrpclib for serialization
1295#
1296
1297_queue = pyqueue.Queue()
1298def get_queue():
1299 return _queue
1300
1301class QueueManager(BaseManager):
1302 '''manager class used by server process'''
1303QueueManager.register('get_queue', callable=get_queue)
1304
1305class QueueManager2(BaseManager):
1306 '''manager class which specifies the same interface as QueueManager'''
1307QueueManager2.register('get_queue')
1308
1309
1310SERIALIZER = 'xmlrpclib'
1311
1312class _TestRemoteManager(BaseTestCase):
1313
1314 ALLOWED_TYPES = ('manager',)
1315
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001316 @classmethod
1317 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001318 manager = QueueManager2(
1319 address=address, authkey=authkey, serializer=SERIALIZER
1320 )
1321 manager.connect()
1322 queue = manager.get_queue()
1323 queue.put(('hello world', None, True, 2.25))
1324
1325 def test_remote(self):
1326 authkey = os.urandom(32)
1327
1328 manager = QueueManager(
1329 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1330 )
1331 manager.start()
1332
1333 p = self.Process(target=self._putter, args=(manager.address, authkey))
1334 p.start()
1335
1336 manager2 = QueueManager2(
1337 address=manager.address, authkey=authkey, serializer=SERIALIZER
1338 )
1339 manager2.connect()
1340 queue = manager2.get_queue()
1341
1342 # Note that xmlrpclib will deserialize object as a list not a tuple
1343 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1344
1345 # Because we are using xmlrpclib for serialization instead of
1346 # pickle this will cause a serialization error.
1347 self.assertRaises(Exception, queue.put, time.sleep)
1348
1349 # Make queue finalizer run before the server is stopped
1350 del queue
1351 manager.shutdown()
1352
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001353class _TestManagerRestart(BaseTestCase):
1354
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001355 @classmethod
1356 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001357 manager = QueueManager(
1358 address=address, authkey=authkey, serializer=SERIALIZER)
1359 manager.connect()
1360 queue = manager.get_queue()
1361 queue.put('hello world')
1362
1363 def test_rapid_restart(self):
1364 authkey = os.urandom(32)
1365 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001366 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001367 srvr = manager.get_server()
1368 addr = srvr.address
1369 # Close the connection.Listener socket which gets opened as a part
1370 # of manager.get_server(). It's not needed for the test.
1371 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001372 manager.start()
1373
1374 p = self.Process(target=self._putter, args=(manager.address, authkey))
1375 p.start()
1376 queue = manager.get_queue()
1377 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001378 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001379 manager.shutdown()
1380 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001381 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001382 try:
1383 manager.start()
1384 except IOError as e:
1385 if e.errno != errno.EADDRINUSE:
1386 raise
1387 # Retry after some time, in case the old socket was lingering
1388 # (sporadic failure on buildbots)
1389 time.sleep(1.0)
1390 manager = QueueManager(
1391 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001392 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001393
Benjamin Petersone711caf2008-06-11 16:44:04 +00001394#
1395#
1396#
1397
1398SENTINEL = latin('')
1399
1400class _TestConnection(BaseTestCase):
1401
1402 ALLOWED_TYPES = ('processes', 'threads')
1403
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001404 @classmethod
1405 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001406 for msg in iter(conn.recv_bytes, SENTINEL):
1407 conn.send_bytes(msg)
1408 conn.close()
1409
1410 def test_connection(self):
1411 conn, child_conn = self.Pipe()
1412
1413 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001414 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001415 p.start()
1416
1417 seq = [1, 2.25, None]
1418 msg = latin('hello world')
1419 longmsg = msg * 10
1420 arr = array.array('i', list(range(4)))
1421
1422 if self.TYPE == 'processes':
1423 self.assertEqual(type(conn.fileno()), int)
1424
1425 self.assertEqual(conn.send(seq), None)
1426 self.assertEqual(conn.recv(), seq)
1427
1428 self.assertEqual(conn.send_bytes(msg), None)
1429 self.assertEqual(conn.recv_bytes(), msg)
1430
1431 if self.TYPE == 'processes':
1432 buffer = array.array('i', [0]*10)
1433 expected = list(arr) + [0] * (10 - len(arr))
1434 self.assertEqual(conn.send_bytes(arr), None)
1435 self.assertEqual(conn.recv_bytes_into(buffer),
1436 len(arr) * buffer.itemsize)
1437 self.assertEqual(list(buffer), expected)
1438
1439 buffer = array.array('i', [0]*10)
1440 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1441 self.assertEqual(conn.send_bytes(arr), None)
1442 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1443 len(arr) * buffer.itemsize)
1444 self.assertEqual(list(buffer), expected)
1445
1446 buffer = bytearray(latin(' ' * 40))
1447 self.assertEqual(conn.send_bytes(longmsg), None)
1448 try:
1449 res = conn.recv_bytes_into(buffer)
1450 except multiprocessing.BufferTooShort as e:
1451 self.assertEqual(e.args, (longmsg,))
1452 else:
1453 self.fail('expected BufferTooShort, got %s' % res)
1454
1455 poll = TimingWrapper(conn.poll)
1456
1457 self.assertEqual(poll(), False)
1458 self.assertTimingAlmostEqual(poll.elapsed, 0)
1459
1460 self.assertEqual(poll(TIMEOUT1), False)
1461 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1462
1463 conn.send(None)
1464
1465 self.assertEqual(poll(TIMEOUT1), True)
1466 self.assertTimingAlmostEqual(poll.elapsed, 0)
1467
1468 self.assertEqual(conn.recv(), None)
1469
1470 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1471 conn.send_bytes(really_big_msg)
1472 self.assertEqual(conn.recv_bytes(), really_big_msg)
1473
1474 conn.send_bytes(SENTINEL) # tell child to quit
1475 child_conn.close()
1476
1477 if self.TYPE == 'processes':
1478 self.assertEqual(conn.readable, True)
1479 self.assertEqual(conn.writable, True)
1480 self.assertRaises(EOFError, conn.recv)
1481 self.assertRaises(EOFError, conn.recv_bytes)
1482
1483 p.join()
1484
1485 def test_duplex_false(self):
1486 reader, writer = self.Pipe(duplex=False)
1487 self.assertEqual(writer.send(1), None)
1488 self.assertEqual(reader.recv(), 1)
1489 if self.TYPE == 'processes':
1490 self.assertEqual(reader.readable, True)
1491 self.assertEqual(reader.writable, False)
1492 self.assertEqual(writer.readable, False)
1493 self.assertEqual(writer.writable, True)
1494 self.assertRaises(IOError, reader.send, 2)
1495 self.assertRaises(IOError, writer.recv)
1496 self.assertRaises(IOError, writer.poll)
1497
1498 def test_spawn_close(self):
1499 # We test that a pipe connection can be closed by parent
1500 # process immediately after child is spawned. On Windows this
1501 # would have sometimes failed on old versions because
1502 # child_conn would be closed before the child got a chance to
1503 # duplicate it.
1504 conn, child_conn = self.Pipe()
1505
1506 p = self.Process(target=self._echo, args=(child_conn,))
1507 p.start()
1508 child_conn.close() # this might complete before child initializes
1509
1510 msg = latin('hello')
1511 conn.send_bytes(msg)
1512 self.assertEqual(conn.recv_bytes(), msg)
1513
1514 conn.send_bytes(SENTINEL)
1515 conn.close()
1516 p.join()
1517
1518 def test_sendbytes(self):
1519 if self.TYPE != 'processes':
1520 return
1521
1522 msg = latin('abcdefghijklmnopqrstuvwxyz')
1523 a, b = self.Pipe()
1524
1525 a.send_bytes(msg)
1526 self.assertEqual(b.recv_bytes(), msg)
1527
1528 a.send_bytes(msg, 5)
1529 self.assertEqual(b.recv_bytes(), msg[5:])
1530
1531 a.send_bytes(msg, 7, 8)
1532 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1533
1534 a.send_bytes(msg, 26)
1535 self.assertEqual(b.recv_bytes(), latin(''))
1536
1537 a.send_bytes(msg, 26, 0)
1538 self.assertEqual(b.recv_bytes(), latin(''))
1539
1540 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1541
1542 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1543
1544 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1545
1546 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1547
1548 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1549
Benjamin Petersone711caf2008-06-11 16:44:04 +00001550class _TestListenerClient(BaseTestCase):
1551
1552 ALLOWED_TYPES = ('processes', 'threads')
1553
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001554 @classmethod
1555 def _test(cls, address):
1556 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001557 conn.send('hello')
1558 conn.close()
1559
1560 def test_listener_client(self):
1561 for family in self.connection.families:
1562 l = self.connection.Listener(family=family)
1563 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001564 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001565 p.start()
1566 conn = l.accept()
1567 self.assertEqual(conn.recv(), 'hello')
1568 p.join()
1569 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001570#
1571# Test of sending connection and socket objects between processes
1572#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001573"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001574class _TestPicklingConnections(BaseTestCase):
1575
1576 ALLOWED_TYPES = ('processes',)
1577
1578 def _listener(self, conn, families):
1579 for fam in families:
1580 l = self.connection.Listener(family=fam)
1581 conn.send(l.address)
1582 new_conn = l.accept()
1583 conn.send(new_conn)
1584
1585 if self.TYPE == 'processes':
1586 l = socket.socket()
1587 l.bind(('localhost', 0))
1588 conn.send(l.getsockname())
1589 l.listen(1)
1590 new_conn, addr = l.accept()
1591 conn.send(new_conn)
1592
1593 conn.recv()
1594
1595 def _remote(self, conn):
1596 for (address, msg) in iter(conn.recv, None):
1597 client = self.connection.Client(address)
1598 client.send(msg.upper())
1599 client.close()
1600
1601 if self.TYPE == 'processes':
1602 address, msg = conn.recv()
1603 client = socket.socket()
1604 client.connect(address)
1605 client.sendall(msg.upper())
1606 client.close()
1607
1608 conn.close()
1609
1610 def test_pickling(self):
1611 try:
1612 multiprocessing.allow_connection_pickling()
1613 except ImportError:
1614 return
1615
1616 families = self.connection.families
1617
1618 lconn, lconn0 = self.Pipe()
1619 lp = self.Process(target=self._listener, args=(lconn0, families))
1620 lp.start()
1621 lconn0.close()
1622
1623 rconn, rconn0 = self.Pipe()
1624 rp = self.Process(target=self._remote, args=(rconn0,))
1625 rp.start()
1626 rconn0.close()
1627
1628 for fam in families:
1629 msg = ('This connection uses family %s' % fam).encode('ascii')
1630 address = lconn.recv()
1631 rconn.send((address, msg))
1632 new_conn = lconn.recv()
1633 self.assertEqual(new_conn.recv(), msg.upper())
1634
1635 rconn.send(None)
1636
1637 if self.TYPE == 'processes':
1638 msg = latin('This connection uses a normal socket')
1639 address = lconn.recv()
1640 rconn.send((address, msg))
1641 if hasattr(socket, 'fromfd'):
1642 new_conn = lconn.recv()
1643 self.assertEqual(new_conn.recv(100), msg.upper())
1644 else:
1645 # XXX On Windows with Py2.6 need to backport fromfd()
1646 discard = lconn.recv_bytes()
1647
1648 lconn.send(None)
1649
1650 rconn.close()
1651 lconn.close()
1652
1653 lp.join()
1654 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001655"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001656#
1657#
1658#
1659
1660class _TestHeap(BaseTestCase):
1661
1662 ALLOWED_TYPES = ('processes',)
1663
1664 def test_heap(self):
1665 iterations = 5000
1666 maxblocks = 50
1667 blocks = []
1668
1669 # create and destroy lots of blocks of different sizes
1670 for i in range(iterations):
1671 size = int(random.lognormvariate(0, 1) * 1000)
1672 b = multiprocessing.heap.BufferWrapper(size)
1673 blocks.append(b)
1674 if len(blocks) > maxblocks:
1675 i = random.randrange(maxblocks)
1676 del blocks[i]
1677
1678 # get the heap object
1679 heap = multiprocessing.heap.BufferWrapper._heap
1680
1681 # verify the state of the heap
1682 all = []
1683 occupied = 0
1684 for L in list(heap._len_to_seq.values()):
1685 for arena, start, stop in L:
1686 all.append((heap._arenas.index(arena), start, stop,
1687 stop-start, 'free'))
1688 for arena, start, stop in heap._allocated_blocks:
1689 all.append((heap._arenas.index(arena), start, stop,
1690 stop-start, 'occupied'))
1691 occupied += (stop-start)
1692
1693 all.sort()
1694
1695 for i in range(len(all)-1):
1696 (arena, start, stop) = all[i][:3]
1697 (narena, nstart, nstop) = all[i+1][:3]
1698 self.assertTrue((arena != narena and nstart == 0) or
1699 (stop == nstart))
1700
1701#
1702#
1703#
1704
Benjamin Petersone711caf2008-06-11 16:44:04 +00001705class _Foo(Structure):
1706 _fields_ = [
1707 ('x', c_int),
1708 ('y', c_double)
1709 ]
1710
1711class _TestSharedCTypes(BaseTestCase):
1712
1713 ALLOWED_TYPES = ('processes',)
1714
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001715 def setUp(self):
1716 if not HAS_SHAREDCTYPES:
1717 self.skipTest("requires multiprocessing.sharedctypes")
1718
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001719 @classmethod
1720 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001721 x.value *= 2
1722 y.value *= 2
1723 foo.x *= 2
1724 foo.y *= 2
1725 string.value *= 2
1726 for i in range(len(arr)):
1727 arr[i] *= 2
1728
1729 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001730 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001731 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001732 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001733 arr = self.Array('d', list(range(10)), lock=lock)
1734 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001735 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001736
1737 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1738 p.start()
1739 p.join()
1740
1741 self.assertEqual(x.value, 14)
1742 self.assertAlmostEqual(y.value, 2.0/3.0)
1743 self.assertEqual(foo.x, 6)
1744 self.assertAlmostEqual(foo.y, 4.0)
1745 for i in range(10):
1746 self.assertAlmostEqual(arr[i], i*2)
1747 self.assertEqual(string.value, latin('hellohello'))
1748
1749 def test_synchronize(self):
1750 self.test_sharedctypes(lock=True)
1751
1752 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001753 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001754 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001755 foo.x = 0
1756 foo.y = 0
1757 self.assertEqual(bar.x, 2)
1758 self.assertAlmostEqual(bar.y, 5.0)
1759
1760#
1761#
1762#
1763
1764class _TestFinalize(BaseTestCase):
1765
1766 ALLOWED_TYPES = ('processes',)
1767
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001768 @classmethod
1769 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001770 class Foo(object):
1771 pass
1772
1773 a = Foo()
1774 util.Finalize(a, conn.send, args=('a',))
1775 del a # triggers callback for a
1776
1777 b = Foo()
1778 close_b = util.Finalize(b, conn.send, args=('b',))
1779 close_b() # triggers callback for b
1780 close_b() # does nothing because callback has already been called
1781 del b # does nothing because callback has already been called
1782
1783 c = Foo()
1784 util.Finalize(c, conn.send, args=('c',))
1785
1786 d10 = Foo()
1787 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1788
1789 d01 = Foo()
1790 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1791 d02 = Foo()
1792 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1793 d03 = Foo()
1794 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1795
1796 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1797
1798 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1799
Ezio Melotti13925002011-03-16 11:05:33 +02001800 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00001801 # garbage collecting locals
1802 util._exit_function()
1803 conn.close()
1804 os._exit(0)
1805
1806 def test_finalize(self):
1807 conn, child_conn = self.Pipe()
1808
1809 p = self.Process(target=self._test_finalize, args=(child_conn,))
1810 p.start()
1811 p.join()
1812
1813 result = [obj for obj in iter(conn.recv, 'STOP')]
1814 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1815
1816#
1817# Test that from ... import * works for each module
1818#
1819
1820class _TestImportStar(BaseTestCase):
1821
1822 ALLOWED_TYPES = ('processes',)
1823
1824 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001825 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001826 'multiprocessing', 'multiprocessing.connection',
1827 'multiprocessing.heap', 'multiprocessing.managers',
1828 'multiprocessing.pool', 'multiprocessing.process',
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001829 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001830 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001831 ]
1832
1833 if c_int is not None:
1834 # This module requires _ctypes
1835 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001836
1837 for name in modules:
1838 __import__(name)
1839 mod = sys.modules[name]
1840
1841 for attr in getattr(mod, '__all__', ()):
1842 self.assertTrue(
1843 hasattr(mod, attr),
1844 '%r does not have attribute %r' % (mod, attr)
1845 )
1846
1847#
1848# Quick test that logging works -- does not test logging output
1849#
1850
1851class _TestLogging(BaseTestCase):
1852
1853 ALLOWED_TYPES = ('processes',)
1854
1855 def test_enable_logging(self):
1856 logger = multiprocessing.get_logger()
1857 logger.setLevel(util.SUBWARNING)
1858 self.assertTrue(logger is not None)
1859 logger.debug('this will not be printed')
1860 logger.info('nor will this')
1861 logger.setLevel(LOG_LEVEL)
1862
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001863 @classmethod
1864 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001865 logger = multiprocessing.get_logger()
1866 conn.send(logger.getEffectiveLevel())
1867
1868 def test_level(self):
1869 LEVEL1 = 32
1870 LEVEL2 = 37
1871
1872 logger = multiprocessing.get_logger()
1873 root_logger = logging.getLogger()
1874 root_level = root_logger.level
1875
1876 reader, writer = multiprocessing.Pipe(duplex=False)
1877
1878 logger.setLevel(LEVEL1)
1879 self.Process(target=self._test_level, args=(writer,)).start()
1880 self.assertEqual(LEVEL1, reader.recv())
1881
1882 logger.setLevel(logging.NOTSET)
1883 root_logger.setLevel(LEVEL2)
1884 self.Process(target=self._test_level, args=(writer,)).start()
1885 self.assertEqual(LEVEL2, reader.recv())
1886
1887 root_logger.setLevel(root_level)
1888 logger.setLevel(level=LOG_LEVEL)
1889
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001890
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00001891# class _TestLoggingProcessName(BaseTestCase):
1892#
1893# def handle(self, record):
1894# assert record.processName == multiprocessing.current_process().name
1895# self.__handled = True
1896#
1897# def test_logging(self):
1898# handler = logging.Handler()
1899# handler.handle = self.handle
1900# self.__handled = False
1901# # Bypass getLogger() and side-effects
1902# logger = logging.getLoggerClass()(
1903# 'multiprocessing.test.TestLoggingProcessName')
1904# logger.addHandler(handler)
1905# logger.propagate = False
1906#
1907# logger.warn('foo')
1908# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001909
Benjamin Petersone711caf2008-06-11 16:44:04 +00001910#
Jesse Noller6214edd2009-01-19 16:23:53 +00001911# Test to verify handle verification, see issue 3321
1912#
1913
1914class TestInvalidHandle(unittest.TestCase):
1915
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001916 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001917 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02001918 conn = multiprocessing.connection.Connection(44977608)
1919 try:
1920 self.assertRaises((ValueError, IOError), conn.poll)
1921 finally:
1922 # Hack private attribute _handle to avoid printing an error
1923 # in conn.__del__
1924 conn._handle = None
1925 self.assertRaises((ValueError, IOError),
1926 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001927
Jesse Noller6214edd2009-01-19 16:23:53 +00001928#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001929# Functions used to create test cases from the base ones in this module
1930#
1931
1932def get_attributes(Source, names):
1933 d = {}
1934 for name in names:
1935 obj = getattr(Source, name)
1936 if type(obj) == type(get_attributes):
1937 obj = staticmethod(obj)
1938 d[name] = obj
1939 return d
1940
1941def create_test_cases(Mixin, type):
1942 result = {}
1943 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001944 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001945
1946 for name in list(glob.keys()):
1947 if name.startswith('_Test'):
1948 base = glob[name]
1949 if type in base.ALLOWED_TYPES:
1950 newname = 'With' + Type + name[1:]
1951 class Temp(base, unittest.TestCase, Mixin):
1952 pass
1953 result[newname] = Temp
1954 Temp.__name__ = newname
1955 Temp.__module__ = Mixin.__module__
1956 return result
1957
1958#
1959# Create test cases
1960#
1961
1962class ProcessesMixin(object):
1963 TYPE = 'processes'
1964 Process = multiprocessing.Process
1965 locals().update(get_attributes(multiprocessing, (
1966 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1967 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1968 'RawArray', 'current_process', 'active_children', 'Pipe',
1969 'connection', 'JoinableQueue'
1970 )))
1971
1972testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1973globals().update(testcases_processes)
1974
1975
1976class ManagerMixin(object):
1977 TYPE = 'manager'
1978 Process = multiprocessing.Process
1979 manager = object.__new__(multiprocessing.managers.SyncManager)
1980 locals().update(get_attributes(manager, (
1981 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1982 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1983 'Namespace', 'JoinableQueue'
1984 )))
1985
1986testcases_manager = create_test_cases(ManagerMixin, type='manager')
1987globals().update(testcases_manager)
1988
1989
1990class ThreadsMixin(object):
1991 TYPE = 'threads'
1992 Process = multiprocessing.dummy.Process
1993 locals().update(get_attributes(multiprocessing.dummy, (
1994 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1995 'Condition', 'Event', 'Value', 'Array', 'current_process',
1996 'active_children', 'Pipe', 'connection', 'dict', 'list',
1997 'Namespace', 'JoinableQueue'
1998 )))
1999
2000testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2001globals().update(testcases_threads)
2002
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002003class OtherTest(unittest.TestCase):
2004 # TODO: add more tests for deliver/answer challenge.
2005 def test_deliver_challenge_auth_failure(self):
2006 class _FakeConnection(object):
2007 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002008 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002009 def send_bytes(self, data):
2010 pass
2011 self.assertRaises(multiprocessing.AuthenticationError,
2012 multiprocessing.connection.deliver_challenge,
2013 _FakeConnection(), b'abc')
2014
2015 def test_answer_challenge_auth_failure(self):
2016 class _FakeConnection(object):
2017 def __init__(self):
2018 self.count = 0
2019 def recv_bytes(self, size):
2020 self.count += 1
2021 if self.count == 1:
2022 return multiprocessing.connection.CHALLENGE
2023 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002024 return b'something bogus'
2025 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002026 def send_bytes(self, data):
2027 pass
2028 self.assertRaises(multiprocessing.AuthenticationError,
2029 multiprocessing.connection.answer_challenge,
2030 _FakeConnection(), b'abc')
2031
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002032#
2033# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2034#
2035
2036def initializer(ns):
2037 ns.test += 1
2038
2039class TestInitializers(unittest.TestCase):
2040 def setUp(self):
2041 self.mgr = multiprocessing.Manager()
2042 self.ns = self.mgr.Namespace()
2043 self.ns.test = 0
2044
2045 def tearDown(self):
2046 self.mgr.shutdown()
2047
2048 def test_manager_initializer(self):
2049 m = multiprocessing.managers.SyncManager()
2050 self.assertRaises(TypeError, m.start, 1)
2051 m.start(initializer, (self.ns,))
2052 self.assertEqual(self.ns.test, 1)
2053 m.shutdown()
2054
2055 def test_pool_initializer(self):
2056 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2057 p = multiprocessing.Pool(1, initializer, (self.ns,))
2058 p.close()
2059 p.join()
2060 self.assertEqual(self.ns.test, 1)
2061
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002062#
2063# Issue 5155, 5313, 5331: Test process in processes
2064# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2065#
2066
2067def _ThisSubProcess(q):
2068 try:
2069 item = q.get(block=False)
2070 except pyqueue.Empty:
2071 pass
2072
2073def _TestProcess(q):
2074 queue = multiprocessing.Queue()
2075 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
2076 subProc.start()
2077 subProc.join()
2078
2079def _afunc(x):
2080 return x*x
2081
2082def pool_in_process():
2083 pool = multiprocessing.Pool(processes=4)
2084 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2085
2086class _file_like(object):
2087 def __init__(self, delegate):
2088 self._delegate = delegate
2089 self._pid = None
2090
2091 @property
2092 def cache(self):
2093 pid = os.getpid()
2094 # There are no race conditions since fork keeps only the running thread
2095 if pid != self._pid:
2096 self._pid = pid
2097 self._cache = []
2098 return self._cache
2099
2100 def write(self, data):
2101 self.cache.append(data)
2102
2103 def flush(self):
2104 self._delegate.write(''.join(self.cache))
2105 self._cache = []
2106
2107class TestStdinBadfiledescriptor(unittest.TestCase):
2108
2109 def test_queue_in_process(self):
2110 queue = multiprocessing.Queue()
2111 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2112 proc.start()
2113 proc.join()
2114
2115 def test_pool_in_process(self):
2116 p = multiprocessing.Process(target=pool_in_process)
2117 p.start()
2118 p.join()
2119
2120 def test_flushing(self):
2121 sio = io.StringIO()
2122 flike = _file_like(sio)
2123 flike.write('foo')
2124 proc = multiprocessing.Process(target=lambda: flike.flush())
2125 flike.flush()
2126 assert sio.getvalue() == 'foo'
2127
2128testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2129 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002130
Benjamin Petersone711caf2008-06-11 16:44:04 +00002131#
2132#
2133#
2134
2135def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002136 if sys.platform.startswith("linux"):
2137 try:
2138 lock = multiprocessing.RLock()
2139 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002140 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002141
Benjamin Petersone711caf2008-06-11 16:44:04 +00002142 if run is None:
2143 from test.support import run_unittest as run
2144
2145 util.get_temp_dir() # creates temp directory for use by all processes
2146
2147 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2148
Benjamin Peterson41181742008-07-02 20:22:54 +00002149 ProcessesMixin.pool = multiprocessing.Pool(4)
2150 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2151 ManagerMixin.manager.__init__()
2152 ManagerMixin.manager.start()
2153 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002154
2155 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002156 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2157 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002158 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2159 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002160 )
2161
2162 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2163 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2164 run(suite)
2165
Benjamin Peterson41181742008-07-02 20:22:54 +00002166 ThreadsMixin.pool.terminate()
2167 ProcessesMixin.pool.terminate()
2168 ManagerMixin.pool.terminate()
2169 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002170
Benjamin Peterson41181742008-07-02 20:22:54 +00002171 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002172
2173def main():
2174 test_main(unittest.TextTestRunner(verbosity=2).run)
2175
2176if __name__ == '__main__':
2177 main()