blob: d53a0f859c9641f2f608e7aedecd42c749de7310 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000027# import threading after _multiprocessing to raise a more revelant error
28# message: "No module named _multiprocessing". _multiprocessing is not compiled
29# without thread support.
30import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.dummy
33import multiprocessing.connection
34import multiprocessing.managers
35import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
38from multiprocessing import util
39
Brian Curtinafa88b52010-10-07 01:12:19 +000040try:
41 from multiprocessing.sharedctypes import Value, copy
42 HAS_SHAREDCTYPES = True
43except ImportError:
44 HAS_SHAREDCTYPES = False
45
Benjamin Petersone711caf2008-06-11 16:44:04 +000046#
47#
48#
49
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000050def latin(s):
51 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000052
Benjamin Petersone711caf2008-06-11 16:44:04 +000053#
54# Constants
55#
56
57LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000058#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000059
60DELTA = 0.1
61CHECK_TIMINGS = False # making true makes tests take a lot longer
62 # and can sometimes cause some non-serious
63 # failures because some calls block a bit
64 # longer than expected
65if CHECK_TIMINGS:
66 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
67else:
68 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
69
70HAVE_GETVALUE = not getattr(_multiprocessing,
71 'HAVE_BROKEN_SEM_GETVALUE', False)
72
Jesse Noller6214edd2009-01-19 16:23:53 +000073WIN32 = (sys.platform == "win32")
74
Benjamin Petersone711caf2008-06-11 16:44:04 +000075#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000076# Some tests require ctypes
77#
78
79try:
Florent Xiclunaaa171062010-08-14 15:56:42 +000080 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +000081except ImportError:
82 Structure = object
83 c_int = c_double = None
84
85#
Benjamin Petersone711caf2008-06-11 16:44:04 +000086# Creates a wrapper for a function which records the time it takes to finish
87#
88
89class TimingWrapper(object):
90
91 def __init__(self, func):
92 self.func = func
93 self.elapsed = None
94
95 def __call__(self, *args, **kwds):
96 t = time.time()
97 try:
98 return self.func(*args, **kwds)
99 finally:
100 self.elapsed = time.time() - t
101
102#
103# Base class for test cases
104#
105
106class BaseTestCase(object):
107
108 ALLOWED_TYPES = ('processes', 'manager', 'threads')
109
110 def assertTimingAlmostEqual(self, a, b):
111 if CHECK_TIMINGS:
112 self.assertAlmostEqual(a, b, 1)
113
114 def assertReturnsIfImplemented(self, value, func, *args):
115 try:
116 res = func(*args)
117 except NotImplementedError:
118 pass
119 else:
120 return self.assertEqual(value, res)
121
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000122 # For the sanity of Windows users, rather than crashing or freezing in
123 # multiple ways.
124 def __reduce__(self, *args):
125 raise NotImplementedError("shouldn't try to pickle a test case")
126
127 __reduce_ex__ = __reduce__
128
Benjamin Petersone711caf2008-06-11 16:44:04 +0000129#
130# Return the value of a semaphore
131#
132
133def get_value(self):
134 try:
135 return self.get_value()
136 except AttributeError:
137 try:
138 return self._Semaphore__value
139 except AttributeError:
140 try:
141 return self._value
142 except AttributeError:
143 raise NotImplementedError
144
145#
146# Testcases
147#
148
149class _TestProcess(BaseTestCase):
150
151 ALLOWED_TYPES = ('processes', 'threads')
152
153 def test_current(self):
154 if self.TYPE == 'threads':
155 return
156
157 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000158 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000159
160 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000161 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000162 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000163 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000164 self.assertEqual(current.ident, os.getpid())
165 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000166
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000167 def test_daemon_argument(self):
168 if self.TYPE == "threads":
169 return
170
171 # By default uses the current process's daemon flag.
172 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000173 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000174 proc1 = self.Process(target=self._test, daemon=True)
175 self.assertTrue(proc1.daemon)
176 proc2 = self.Process(target=self._test, daemon=False)
177 self.assertFalse(proc2.daemon)
178
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000179 @classmethod
180 def _test(cls, q, *args, **kwds):
181 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000182 q.put(args)
183 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000184 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000185 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000186 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000187 q.put(current.pid)
188
189 def test_process(self):
190 q = self.Queue(1)
191 e = self.Event()
192 args = (q, 1, 2)
193 kwargs = {'hello':23, 'bye':2.54}
194 name = 'SomeProcess'
195 p = self.Process(
196 target=self._test, args=args, kwargs=kwargs, name=name
197 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000198 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000199 current = self.current_process()
200
201 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000202 self.assertEqual(p.authkey, current.authkey)
203 self.assertEqual(p.is_alive(), False)
204 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000205 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000207 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000208
209 p.start()
210
Ezio Melottib3aedd42010-11-20 19:04:17 +0000211 self.assertEqual(p.exitcode, None)
212 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000213 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000214
Ezio Melottib3aedd42010-11-20 19:04:17 +0000215 self.assertEqual(q.get(), args[1:])
216 self.assertEqual(q.get(), kwargs)
217 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000218 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000219 self.assertEqual(q.get(), current.authkey)
220 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000221
222 p.join()
223
Ezio Melottib3aedd42010-11-20 19:04:17 +0000224 self.assertEqual(p.exitcode, 0)
225 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000226 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000227
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000228 @classmethod
229 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000230 time.sleep(1000)
231
232 def test_terminate(self):
233 if self.TYPE == 'threads':
234 return
235
236 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000237 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000238 p.start()
239
240 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000241 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000242 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000243
244 p.terminate()
245
246 join = TimingWrapper(p.join)
247 self.assertEqual(join(), None)
248 self.assertTimingAlmostEqual(join.elapsed, 0.0)
249
250 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000251 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000252
253 p.join()
254
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000255 # XXX sometimes get p.exitcode == 0 on Windows ...
256 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000257
258 def test_cpu_count(self):
259 try:
260 cpus = multiprocessing.cpu_count()
261 except NotImplementedError:
262 cpus = 1
263 self.assertTrue(type(cpus) is int)
264 self.assertTrue(cpus >= 1)
265
266 def test_active_children(self):
267 self.assertEqual(type(self.active_children()), list)
268
269 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000270 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000271
272 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000273 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000274
275 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000276 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000277
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000278 @classmethod
279 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000280 from multiprocessing import forking
281 wconn.send(id)
282 if len(id) < 2:
283 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000284 p = cls.Process(
285 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000286 )
287 p.start()
288 p.join()
289
290 def test_recursion(self):
291 rconn, wconn = self.Pipe(duplex=False)
292 self._test_recursion(wconn, [])
293
294 time.sleep(DELTA)
295 result = []
296 while rconn.poll():
297 result.append(rconn.recv())
298
299 expected = [
300 [],
301 [0],
302 [0, 0],
303 [0, 1],
304 [1],
305 [1, 0],
306 [1, 1]
307 ]
308 self.assertEqual(result, expected)
309
310#
311#
312#
313
314class _UpperCaser(multiprocessing.Process):
315
316 def __init__(self):
317 multiprocessing.Process.__init__(self)
318 self.child_conn, self.parent_conn = multiprocessing.Pipe()
319
320 def run(self):
321 self.parent_conn.close()
322 for s in iter(self.child_conn.recv, None):
323 self.child_conn.send(s.upper())
324 self.child_conn.close()
325
326 def submit(self, s):
327 assert type(s) is str
328 self.parent_conn.send(s)
329 return self.parent_conn.recv()
330
331 def stop(self):
332 self.parent_conn.send(None)
333 self.parent_conn.close()
334 self.child_conn.close()
335
336class _TestSubclassingProcess(BaseTestCase):
337
338 ALLOWED_TYPES = ('processes',)
339
340 def test_subclassing(self):
341 uppercaser = _UpperCaser()
342 uppercaser.start()
343 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
344 self.assertEqual(uppercaser.submit('world'), 'WORLD')
345 uppercaser.stop()
346 uppercaser.join()
347
348#
349#
350#
351
352def queue_empty(q):
353 if hasattr(q, 'empty'):
354 return q.empty()
355 else:
356 return q.qsize() == 0
357
358def queue_full(q, maxsize):
359 if hasattr(q, 'full'):
360 return q.full()
361 else:
362 return q.qsize() == maxsize
363
364
365class _TestQueue(BaseTestCase):
366
367
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000368 @classmethod
369 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000370 child_can_start.wait()
371 for i in range(6):
372 queue.get()
373 parent_can_continue.set()
374
375 def test_put(self):
376 MAXSIZE = 6
377 queue = self.Queue(maxsize=MAXSIZE)
378 child_can_start = self.Event()
379 parent_can_continue = self.Event()
380
381 proc = self.Process(
382 target=self._test_put,
383 args=(queue, child_can_start, parent_can_continue)
384 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000385 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000386 proc.start()
387
388 self.assertEqual(queue_empty(queue), True)
389 self.assertEqual(queue_full(queue, MAXSIZE), False)
390
391 queue.put(1)
392 queue.put(2, True)
393 queue.put(3, True, None)
394 queue.put(4, False)
395 queue.put(5, False, None)
396 queue.put_nowait(6)
397
398 # the values may be in buffer but not yet in pipe so sleep a bit
399 time.sleep(DELTA)
400
401 self.assertEqual(queue_empty(queue), False)
402 self.assertEqual(queue_full(queue, MAXSIZE), True)
403
404 put = TimingWrapper(queue.put)
405 put_nowait = TimingWrapper(queue.put_nowait)
406
407 self.assertRaises(pyqueue.Full, put, 7, False)
408 self.assertTimingAlmostEqual(put.elapsed, 0)
409
410 self.assertRaises(pyqueue.Full, put, 7, False, None)
411 self.assertTimingAlmostEqual(put.elapsed, 0)
412
413 self.assertRaises(pyqueue.Full, put_nowait, 7)
414 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
415
416 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
417 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
418
419 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
420 self.assertTimingAlmostEqual(put.elapsed, 0)
421
422 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
423 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
424
425 child_can_start.set()
426 parent_can_continue.wait()
427
428 self.assertEqual(queue_empty(queue), True)
429 self.assertEqual(queue_full(queue, MAXSIZE), False)
430
431 proc.join()
432
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000433 @classmethod
434 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000435 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000436 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000437 queue.put(2)
438 queue.put(3)
439 queue.put(4)
440 queue.put(5)
441 parent_can_continue.set()
442
443 def test_get(self):
444 queue = self.Queue()
445 child_can_start = self.Event()
446 parent_can_continue = self.Event()
447
448 proc = self.Process(
449 target=self._test_get,
450 args=(queue, child_can_start, parent_can_continue)
451 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000452 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000453 proc.start()
454
455 self.assertEqual(queue_empty(queue), True)
456
457 child_can_start.set()
458 parent_can_continue.wait()
459
460 time.sleep(DELTA)
461 self.assertEqual(queue_empty(queue), False)
462
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000463 # Hangs unexpectedly, remove for now
464 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000465 self.assertEqual(queue.get(True, None), 2)
466 self.assertEqual(queue.get(True), 3)
467 self.assertEqual(queue.get(timeout=1), 4)
468 self.assertEqual(queue.get_nowait(), 5)
469
470 self.assertEqual(queue_empty(queue), True)
471
472 get = TimingWrapper(queue.get)
473 get_nowait = TimingWrapper(queue.get_nowait)
474
475 self.assertRaises(pyqueue.Empty, get, False)
476 self.assertTimingAlmostEqual(get.elapsed, 0)
477
478 self.assertRaises(pyqueue.Empty, get, False, None)
479 self.assertTimingAlmostEqual(get.elapsed, 0)
480
481 self.assertRaises(pyqueue.Empty, get_nowait)
482 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
483
484 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
485 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
486
487 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
488 self.assertTimingAlmostEqual(get.elapsed, 0)
489
490 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
491 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
492
493 proc.join()
494
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000495 @classmethod
496 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000497 for i in range(10, 20):
498 queue.put(i)
499 # note that at this point the items may only be buffered, so the
500 # process cannot shutdown until the feeder thread has finished
501 # pushing items onto the pipe.
502
503 def test_fork(self):
504 # Old versions of Queue would fail to create a new feeder
505 # thread for a forked process if the original process had its
506 # own feeder thread. This test checks that this no longer
507 # happens.
508
509 queue = self.Queue()
510
511 # put items on queue so that main process starts a feeder thread
512 for i in range(10):
513 queue.put(i)
514
515 # wait to make sure thread starts before we fork a new process
516 time.sleep(DELTA)
517
518 # fork process
519 p = self.Process(target=self._test_fork, args=(queue,))
520 p.start()
521
522 # check that all expected items are in the queue
523 for i in range(20):
524 self.assertEqual(queue.get(), i)
525 self.assertRaises(pyqueue.Empty, queue.get, False)
526
527 p.join()
528
529 def test_qsize(self):
530 q = self.Queue()
531 try:
532 self.assertEqual(q.qsize(), 0)
533 except NotImplementedError:
534 return
535 q.put(1)
536 self.assertEqual(q.qsize(), 1)
537 q.put(5)
538 self.assertEqual(q.qsize(), 2)
539 q.get()
540 self.assertEqual(q.qsize(), 1)
541 q.get()
542 self.assertEqual(q.qsize(), 0)
543
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000544 @classmethod
545 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000546 for obj in iter(q.get, None):
547 time.sleep(DELTA)
548 q.task_done()
549
550 def test_task_done(self):
551 queue = self.JoinableQueue()
552
553 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000554 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000555
556 workers = [self.Process(target=self._test_task_done, args=(queue,))
557 for i in range(4)]
558
559 for p in workers:
560 p.start()
561
562 for i in range(10):
563 queue.put(i)
564
565 queue.join()
566
567 for p in workers:
568 queue.put(None)
569
570 for p in workers:
571 p.join()
572
573#
574#
575#
576
577class _TestLock(BaseTestCase):
578
579 def test_lock(self):
580 lock = self.Lock()
581 self.assertEqual(lock.acquire(), True)
582 self.assertEqual(lock.acquire(False), False)
583 self.assertEqual(lock.release(), None)
584 self.assertRaises((ValueError, threading.ThreadError), lock.release)
585
586 def test_rlock(self):
587 lock = self.RLock()
588 self.assertEqual(lock.acquire(), True)
589 self.assertEqual(lock.acquire(), True)
590 self.assertEqual(lock.acquire(), True)
591 self.assertEqual(lock.release(), None)
592 self.assertEqual(lock.release(), None)
593 self.assertEqual(lock.release(), None)
594 self.assertRaises((AssertionError, RuntimeError), lock.release)
595
Jesse Nollerf8d00852009-03-31 03:25:07 +0000596 def test_lock_context(self):
597 with self.Lock():
598 pass
599
Benjamin Petersone711caf2008-06-11 16:44:04 +0000600
601class _TestSemaphore(BaseTestCase):
602
603 def _test_semaphore(self, sem):
604 self.assertReturnsIfImplemented(2, get_value, sem)
605 self.assertEqual(sem.acquire(), True)
606 self.assertReturnsIfImplemented(1, get_value, sem)
607 self.assertEqual(sem.acquire(), True)
608 self.assertReturnsIfImplemented(0, get_value, sem)
609 self.assertEqual(sem.acquire(False), False)
610 self.assertReturnsIfImplemented(0, get_value, sem)
611 self.assertEqual(sem.release(), None)
612 self.assertReturnsIfImplemented(1, get_value, sem)
613 self.assertEqual(sem.release(), None)
614 self.assertReturnsIfImplemented(2, get_value, sem)
615
616 def test_semaphore(self):
617 sem = self.Semaphore(2)
618 self._test_semaphore(sem)
619 self.assertEqual(sem.release(), None)
620 self.assertReturnsIfImplemented(3, get_value, sem)
621 self.assertEqual(sem.release(), None)
622 self.assertReturnsIfImplemented(4, get_value, sem)
623
624 def test_bounded_semaphore(self):
625 sem = self.BoundedSemaphore(2)
626 self._test_semaphore(sem)
627 # Currently fails on OS/X
628 #if HAVE_GETVALUE:
629 # self.assertRaises(ValueError, sem.release)
630 # self.assertReturnsIfImplemented(2, get_value, sem)
631
632 def test_timeout(self):
633 if self.TYPE != 'processes':
634 return
635
636 sem = self.Semaphore(0)
637 acquire = TimingWrapper(sem.acquire)
638
639 self.assertEqual(acquire(False), False)
640 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
641
642 self.assertEqual(acquire(False, None), False)
643 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
644
645 self.assertEqual(acquire(False, TIMEOUT1), False)
646 self.assertTimingAlmostEqual(acquire.elapsed, 0)
647
648 self.assertEqual(acquire(True, TIMEOUT2), False)
649 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
650
651 self.assertEqual(acquire(timeout=TIMEOUT3), False)
652 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
653
654
655class _TestCondition(BaseTestCase):
656
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000657 @classmethod
658 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000659 cond.acquire()
660 sleeping.release()
661 cond.wait(timeout)
662 woken.release()
663 cond.release()
664
665 def check_invariant(self, cond):
666 # this is only supposed to succeed when there are no sleepers
667 if self.TYPE == 'processes':
668 try:
669 sleepers = (cond._sleeping_count.get_value() -
670 cond._woken_count.get_value())
671 self.assertEqual(sleepers, 0)
672 self.assertEqual(cond._wait_semaphore.get_value(), 0)
673 except NotImplementedError:
674 pass
675
676 def test_notify(self):
677 cond = self.Condition()
678 sleeping = self.Semaphore(0)
679 woken = self.Semaphore(0)
680
681 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000682 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000683 p.start()
684
685 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000686 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000687 p.start()
688
689 # wait for both children to start sleeping
690 sleeping.acquire()
691 sleeping.acquire()
692
693 # check no process/thread has woken up
694 time.sleep(DELTA)
695 self.assertReturnsIfImplemented(0, get_value, woken)
696
697 # wake up one process/thread
698 cond.acquire()
699 cond.notify()
700 cond.release()
701
702 # check one process/thread has woken up
703 time.sleep(DELTA)
704 self.assertReturnsIfImplemented(1, get_value, woken)
705
706 # wake up another
707 cond.acquire()
708 cond.notify()
709 cond.release()
710
711 # check other has woken up
712 time.sleep(DELTA)
713 self.assertReturnsIfImplemented(2, get_value, woken)
714
715 # check state is not mucked up
716 self.check_invariant(cond)
717 p.join()
718
719 def test_notify_all(self):
720 cond = self.Condition()
721 sleeping = self.Semaphore(0)
722 woken = self.Semaphore(0)
723
724 # start some threads/processes which will timeout
725 for i in range(3):
726 p = self.Process(target=self.f,
727 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000728 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000729 p.start()
730
731 t = threading.Thread(target=self.f,
732 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000733 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000734 t.start()
735
736 # wait for them all to sleep
737 for i in range(6):
738 sleeping.acquire()
739
740 # check they have all timed out
741 for i in range(6):
742 woken.acquire()
743 self.assertReturnsIfImplemented(0, get_value, woken)
744
745 # check state is not mucked up
746 self.check_invariant(cond)
747
748 # start some more threads/processes
749 for i in range(3):
750 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000751 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000752 p.start()
753
754 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000755 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000756 t.start()
757
758 # wait for them to all sleep
759 for i in range(6):
760 sleeping.acquire()
761
762 # check no process/thread has woken up
763 time.sleep(DELTA)
764 self.assertReturnsIfImplemented(0, get_value, woken)
765
766 # wake them all up
767 cond.acquire()
768 cond.notify_all()
769 cond.release()
770
771 # check they have all woken
772 time.sleep(DELTA)
773 self.assertReturnsIfImplemented(6, get_value, woken)
774
775 # check state is not mucked up
776 self.check_invariant(cond)
777
778 def test_timeout(self):
779 cond = self.Condition()
780 wait = TimingWrapper(cond.wait)
781 cond.acquire()
782 res = wait(TIMEOUT1)
783 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000784 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000785 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
786
787
788class _TestEvent(BaseTestCase):
789
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000790 @classmethod
791 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000792 time.sleep(TIMEOUT2)
793 event.set()
794
795 def test_event(self):
796 event = self.Event()
797 wait = TimingWrapper(event.wait)
798
Ezio Melotti13925002011-03-16 11:05:33 +0200799 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000800 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000801 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000802
Benjamin Peterson965ce872009-04-05 21:24:58 +0000803 # Removed, threading.Event.wait() will return the value of the __flag
804 # instead of None. API Shear with the semaphore backed mp.Event
805 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000806 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000807 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000808 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
809
810 event.set()
811
812 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000813 self.assertEqual(event.is_set(), True)
814 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000815 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000816 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000817 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
818 # self.assertEqual(event.is_set(), True)
819
820 event.clear()
821
822 #self.assertEqual(event.is_set(), False)
823
824 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000825 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000826
827#
828#
829#
830
831class _TestValue(BaseTestCase):
832
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000833 ALLOWED_TYPES = ('processes',)
834
Benjamin Petersone711caf2008-06-11 16:44:04 +0000835 codes_values = [
836 ('i', 4343, 24234),
837 ('d', 3.625, -4.25),
838 ('h', -232, 234),
839 ('c', latin('x'), latin('y'))
840 ]
841
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000842 def setUp(self):
843 if not HAS_SHAREDCTYPES:
844 self.skipTest("requires multiprocessing.sharedctypes")
845
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000846 @classmethod
847 def _test(cls, values):
848 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000849 sv.value = cv[2]
850
851
852 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000853 if raw:
854 values = [self.RawValue(code, value)
855 for code, value, _ in self.codes_values]
856 else:
857 values = [self.Value(code, value)
858 for code, value, _ in self.codes_values]
859
860 for sv, cv in zip(values, self.codes_values):
861 self.assertEqual(sv.value, cv[1])
862
863 proc = self.Process(target=self._test, args=(values,))
864 proc.start()
865 proc.join()
866
867 for sv, cv in zip(values, self.codes_values):
868 self.assertEqual(sv.value, cv[2])
869
870 def test_rawvalue(self):
871 self.test_value(raw=True)
872
873 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000874 val1 = self.Value('i', 5)
875 lock1 = val1.get_lock()
876 obj1 = val1.get_obj()
877
878 val2 = self.Value('i', 5, lock=None)
879 lock2 = val2.get_lock()
880 obj2 = val2.get_obj()
881
882 lock = self.Lock()
883 val3 = self.Value('i', 5, lock=lock)
884 lock3 = val3.get_lock()
885 obj3 = val3.get_obj()
886 self.assertEqual(lock, lock3)
887
Jesse Nollerb0516a62009-01-18 03:11:38 +0000888 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000889 self.assertFalse(hasattr(arr4, 'get_lock'))
890 self.assertFalse(hasattr(arr4, 'get_obj'))
891
Jesse Nollerb0516a62009-01-18 03:11:38 +0000892 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
893
894 arr5 = self.RawValue('i', 5)
895 self.assertFalse(hasattr(arr5, 'get_lock'))
896 self.assertFalse(hasattr(arr5, 'get_obj'))
897
Benjamin Petersone711caf2008-06-11 16:44:04 +0000898
899class _TestArray(BaseTestCase):
900
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000901 ALLOWED_TYPES = ('processes',)
902
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000903 @classmethod
904 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000905 for i in range(1, len(seq)):
906 seq[i] += seq[i-1]
907
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000908 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000909 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000910 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
911 if raw:
912 arr = self.RawArray('i', seq)
913 else:
914 arr = self.Array('i', seq)
915
916 self.assertEqual(len(arr), len(seq))
917 self.assertEqual(arr[3], seq[3])
918 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
919
920 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
921
922 self.assertEqual(list(arr[:]), seq)
923
924 self.f(seq)
925
926 p = self.Process(target=self.f, args=(arr,))
927 p.start()
928 p.join()
929
930 self.assertEqual(list(arr[:]), seq)
931
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000932 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +0000933 def test_array_from_size(self):
934 size = 10
935 # Test for zeroing (see issue #11675).
936 # The repetition below strengthens the test by increasing the chances
937 # of previously allocated non-zero memory being used for the new array
938 # on the 2nd and 3rd loops.
939 for _ in range(3):
940 arr = self.Array('i', size)
941 self.assertEqual(len(arr), size)
942 self.assertEqual(list(arr), [0] * size)
943 arr[:] = range(10)
944 self.assertEqual(list(arr), list(range(10)))
945 del arr
946
947 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000948 def test_rawarray(self):
949 self.test_array(raw=True)
950
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000951 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000952 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000953 arr1 = self.Array('i', list(range(10)))
954 lock1 = arr1.get_lock()
955 obj1 = arr1.get_obj()
956
957 arr2 = self.Array('i', list(range(10)), lock=None)
958 lock2 = arr2.get_lock()
959 obj2 = arr2.get_obj()
960
961 lock = self.Lock()
962 arr3 = self.Array('i', list(range(10)), lock=lock)
963 lock3 = arr3.get_lock()
964 obj3 = arr3.get_obj()
965 self.assertEqual(lock, lock3)
966
Jesse Nollerb0516a62009-01-18 03:11:38 +0000967 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000968 self.assertFalse(hasattr(arr4, 'get_lock'))
969 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000970 self.assertRaises(AttributeError,
971 self.Array, 'i', range(10), lock='notalock')
972
973 arr5 = self.RawArray('i', range(10))
974 self.assertFalse(hasattr(arr5, 'get_lock'))
975 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000976
977#
978#
979#
980
981class _TestContainers(BaseTestCase):
982
983 ALLOWED_TYPES = ('manager',)
984
985 def test_list(self):
986 a = self.list(list(range(10)))
987 self.assertEqual(a[:], list(range(10)))
988
989 b = self.list()
990 self.assertEqual(b[:], [])
991
992 b.extend(list(range(5)))
993 self.assertEqual(b[:], list(range(5)))
994
995 self.assertEqual(b[2], 2)
996 self.assertEqual(b[2:10], [2,3,4])
997
998 b *= 2
999 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1000
1001 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1002
1003 self.assertEqual(a[:], list(range(10)))
1004
1005 d = [a, b]
1006 e = self.list(d)
1007 self.assertEqual(
1008 e[:],
1009 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1010 )
1011
1012 f = self.list([a])
1013 a.append('hello')
1014 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1015
1016 def test_dict(self):
1017 d = self.dict()
1018 indices = list(range(65, 70))
1019 for i in indices:
1020 d[i] = chr(i)
1021 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1022 self.assertEqual(sorted(d.keys()), indices)
1023 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1024 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1025
1026 def test_namespace(self):
1027 n = self.Namespace()
1028 n.name = 'Bob'
1029 n.job = 'Builder'
1030 n._hidden = 'hidden'
1031 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1032 del n.job
1033 self.assertEqual(str(n), "Namespace(name='Bob')")
1034 self.assertTrue(hasattr(n, 'name'))
1035 self.assertTrue(not hasattr(n, 'job'))
1036
1037#
1038#
1039#
1040
1041def sqr(x, wait=0.0):
1042 time.sleep(wait)
1043 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001044
Benjamin Petersone711caf2008-06-11 16:44:04 +00001045class _TestPool(BaseTestCase):
1046
1047 def test_apply(self):
1048 papply = self.pool.apply
1049 self.assertEqual(papply(sqr, (5,)), sqr(5))
1050 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1051
1052 def test_map(self):
1053 pmap = self.pool.map
1054 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1055 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1056 list(map(sqr, list(range(100)))))
1057
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001058 def test_map_chunksize(self):
1059 try:
1060 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1061 except multiprocessing.TimeoutError:
1062 self.fail("pool.map_async with chunksize stalled on null list")
1063
Benjamin Petersone711caf2008-06-11 16:44:04 +00001064 def test_async(self):
1065 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1066 get = TimingWrapper(res.get)
1067 self.assertEqual(get(), 49)
1068 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1069
1070 def test_async_timeout(self):
1071 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1072 get = TimingWrapper(res.get)
1073 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1074 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1075
1076 def test_imap(self):
1077 it = self.pool.imap(sqr, list(range(10)))
1078 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1079
1080 it = self.pool.imap(sqr, list(range(10)))
1081 for i in range(10):
1082 self.assertEqual(next(it), i*i)
1083 self.assertRaises(StopIteration, it.__next__)
1084
1085 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1086 for i in range(1000):
1087 self.assertEqual(next(it), i*i)
1088 self.assertRaises(StopIteration, it.__next__)
1089
1090 def test_imap_unordered(self):
1091 it = self.pool.imap_unordered(sqr, list(range(1000)))
1092 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1093
1094 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1095 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1096
1097 def test_make_pool(self):
1098 p = multiprocessing.Pool(3)
1099 self.assertEqual(3, len(p._pool))
1100 p.close()
1101 p.join()
1102
1103 def test_terminate(self):
1104 if self.TYPE == 'manager':
1105 # On Unix a forked process increfs each shared object to
1106 # which its parent process held a reference. If the
1107 # forked process gets terminated then there is likely to
1108 # be a reference leak. So to prevent
1109 # _TestZZZNumberOfObjects from failing we skip this test
1110 # when using a manager.
1111 return
1112
1113 result = self.pool.map_async(
1114 time.sleep, [0.1 for i in range(10000)], chunksize=1
1115 )
1116 self.pool.terminate()
1117 join = TimingWrapper(self.pool.join)
1118 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001119 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001120
Ask Solem2afcbf22010-11-09 20:55:52 +00001121def raising():
1122 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001123
Ask Solem2afcbf22010-11-09 20:55:52 +00001124def unpickleable_result():
1125 return lambda: 42
1126
1127class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001128 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001129
1130 def test_async_error_callback(self):
1131 p = multiprocessing.Pool(2)
1132
1133 scratchpad = [None]
1134 def errback(exc):
1135 scratchpad[0] = exc
1136
1137 res = p.apply_async(raising, error_callback=errback)
1138 self.assertRaises(KeyError, res.get)
1139 self.assertTrue(scratchpad[0])
1140 self.assertIsInstance(scratchpad[0], KeyError)
1141
1142 p.close()
1143 p.join()
1144
1145 def test_unpickleable_result(self):
1146 from multiprocessing.pool import MaybeEncodingError
1147 p = multiprocessing.Pool(2)
1148
1149 # Make sure we don't lose pool processes because of encoding errors.
1150 for iteration in range(20):
1151
1152 scratchpad = [None]
1153 def errback(exc):
1154 scratchpad[0] = exc
1155
1156 res = p.apply_async(unpickleable_result, error_callback=errback)
1157 self.assertRaises(MaybeEncodingError, res.get)
1158 wrapped = scratchpad[0]
1159 self.assertTrue(wrapped)
1160 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1161 self.assertIsNotNone(wrapped.exc)
1162 self.assertIsNotNone(wrapped.value)
1163
1164 p.close()
1165 p.join()
1166
1167class _TestPoolWorkerLifetime(BaseTestCase):
1168 ALLOWED_TYPES = ('processes', )
1169
Jesse Noller1f0b6582010-01-27 03:36:01 +00001170 def test_pool_worker_lifetime(self):
1171 p = multiprocessing.Pool(3, maxtasksperchild=10)
1172 self.assertEqual(3, len(p._pool))
1173 origworkerpids = [w.pid for w in p._pool]
1174 # Run many tasks so each worker gets replaced (hopefully)
1175 results = []
1176 for i in range(100):
1177 results.append(p.apply_async(sqr, (i, )))
1178 # Fetch the results and verify we got the right answers,
1179 # also ensuring all the tasks have completed.
1180 for (j, res) in enumerate(results):
1181 self.assertEqual(res.get(), sqr(j))
1182 # Refill the pool
1183 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001184 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001185 # (countdown * DELTA = 5 seconds max startup process time)
1186 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001187 while countdown and not all(w.is_alive() for w in p._pool):
1188 countdown -= 1
1189 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001190 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001191 # All pids should be assigned. See issue #7805.
1192 self.assertNotIn(None, origworkerpids)
1193 self.assertNotIn(None, finalworkerpids)
1194 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001195 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1196 p.close()
1197 p.join()
1198
Benjamin Petersone711caf2008-06-11 16:44:04 +00001199#
1200# Test that manager has expected number of shared objects left
1201#
1202
1203class _TestZZZNumberOfObjects(BaseTestCase):
1204 # Because test cases are sorted alphabetically, this one will get
1205 # run after all the other tests for the manager. It tests that
1206 # there have been no "reference leaks" for the manager's shared
1207 # objects. Note the comment in _TestPool.test_terminate().
1208 ALLOWED_TYPES = ('manager',)
1209
1210 def test_number_of_objects(self):
1211 EXPECTED_NUMBER = 1 # the pool object is still alive
1212 multiprocessing.active_children() # discard dead process objs
1213 gc.collect() # do garbage collection
1214 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001215 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001216 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001217 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001218 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001219
1220 self.assertEqual(refs, EXPECTED_NUMBER)
1221
1222#
1223# Test of creating a customized manager class
1224#
1225
1226from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1227
1228class FooBar(object):
1229 def f(self):
1230 return 'f()'
1231 def g(self):
1232 raise ValueError
1233 def _h(self):
1234 return '_h()'
1235
1236def baz():
1237 for i in range(10):
1238 yield i*i
1239
1240class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001241 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001242 def __iter__(self):
1243 return self
1244 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001245 return self._callmethod('__next__')
1246
1247class MyManager(BaseManager):
1248 pass
1249
1250MyManager.register('Foo', callable=FooBar)
1251MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1252MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1253
1254
1255class _TestMyManager(BaseTestCase):
1256
1257 ALLOWED_TYPES = ('manager',)
1258
1259 def test_mymanager(self):
1260 manager = MyManager()
1261 manager.start()
1262
1263 foo = manager.Foo()
1264 bar = manager.Bar()
1265 baz = manager.baz()
1266
1267 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1268 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1269
1270 self.assertEqual(foo_methods, ['f', 'g'])
1271 self.assertEqual(bar_methods, ['f', '_h'])
1272
1273 self.assertEqual(foo.f(), 'f()')
1274 self.assertRaises(ValueError, foo.g)
1275 self.assertEqual(foo._callmethod('f'), 'f()')
1276 self.assertRaises(RemoteError, foo._callmethod, '_h')
1277
1278 self.assertEqual(bar.f(), 'f()')
1279 self.assertEqual(bar._h(), '_h()')
1280 self.assertEqual(bar._callmethod('f'), 'f()')
1281 self.assertEqual(bar._callmethod('_h'), '_h()')
1282
1283 self.assertEqual(list(baz), [i*i for i in range(10)])
1284
1285 manager.shutdown()
1286
1287#
1288# Test of connecting to a remote server and using xmlrpclib for serialization
1289#
1290
1291_queue = pyqueue.Queue()
1292def get_queue():
1293 return _queue
1294
1295class QueueManager(BaseManager):
1296 '''manager class used by server process'''
1297QueueManager.register('get_queue', callable=get_queue)
1298
1299class QueueManager2(BaseManager):
1300 '''manager class which specifies the same interface as QueueManager'''
1301QueueManager2.register('get_queue')
1302
1303
1304SERIALIZER = 'xmlrpclib'
1305
1306class _TestRemoteManager(BaseTestCase):
1307
1308 ALLOWED_TYPES = ('manager',)
1309
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001310 @classmethod
1311 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001312 manager = QueueManager2(
1313 address=address, authkey=authkey, serializer=SERIALIZER
1314 )
1315 manager.connect()
1316 queue = manager.get_queue()
1317 queue.put(('hello world', None, True, 2.25))
1318
1319 def test_remote(self):
1320 authkey = os.urandom(32)
1321
1322 manager = QueueManager(
1323 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1324 )
1325 manager.start()
1326
1327 p = self.Process(target=self._putter, args=(manager.address, authkey))
1328 p.start()
1329
1330 manager2 = QueueManager2(
1331 address=manager.address, authkey=authkey, serializer=SERIALIZER
1332 )
1333 manager2.connect()
1334 queue = manager2.get_queue()
1335
1336 # Note that xmlrpclib will deserialize object as a list not a tuple
1337 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1338
1339 # Because we are using xmlrpclib for serialization instead of
1340 # pickle this will cause a serialization error.
1341 self.assertRaises(Exception, queue.put, time.sleep)
1342
1343 # Make queue finalizer run before the server is stopped
1344 del queue
1345 manager.shutdown()
1346
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001347class _TestManagerRestart(BaseTestCase):
1348
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001349 @classmethod
1350 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001351 manager = QueueManager(
1352 address=address, authkey=authkey, serializer=SERIALIZER)
1353 manager.connect()
1354 queue = manager.get_queue()
1355 queue.put('hello world')
1356
1357 def test_rapid_restart(self):
1358 authkey = os.urandom(32)
1359 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001360 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001361 srvr = manager.get_server()
1362 addr = srvr.address
1363 # Close the connection.Listener socket which gets opened as a part
1364 # of manager.get_server(). It's not needed for the test.
1365 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001366 manager.start()
1367
1368 p = self.Process(target=self._putter, args=(manager.address, authkey))
1369 p.start()
1370 queue = manager.get_queue()
1371 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001372 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001373 manager.shutdown()
1374 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001375 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001376 try:
1377 manager.start()
1378 except IOError as e:
1379 if e.errno != errno.EADDRINUSE:
1380 raise
1381 # Retry after some time, in case the old socket was lingering
1382 # (sporadic failure on buildbots)
1383 time.sleep(1.0)
1384 manager = QueueManager(
1385 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001386 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001387
Benjamin Petersone711caf2008-06-11 16:44:04 +00001388#
1389#
1390#
1391
1392SENTINEL = latin('')
1393
1394class _TestConnection(BaseTestCase):
1395
1396 ALLOWED_TYPES = ('processes', 'threads')
1397
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001398 @classmethod
1399 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001400 for msg in iter(conn.recv_bytes, SENTINEL):
1401 conn.send_bytes(msg)
1402 conn.close()
1403
1404 def test_connection(self):
1405 conn, child_conn = self.Pipe()
1406
1407 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001408 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001409 p.start()
1410
1411 seq = [1, 2.25, None]
1412 msg = latin('hello world')
1413 longmsg = msg * 10
1414 arr = array.array('i', list(range(4)))
1415
1416 if self.TYPE == 'processes':
1417 self.assertEqual(type(conn.fileno()), int)
1418
1419 self.assertEqual(conn.send(seq), None)
1420 self.assertEqual(conn.recv(), seq)
1421
1422 self.assertEqual(conn.send_bytes(msg), None)
1423 self.assertEqual(conn.recv_bytes(), msg)
1424
1425 if self.TYPE == 'processes':
1426 buffer = array.array('i', [0]*10)
1427 expected = list(arr) + [0] * (10 - len(arr))
1428 self.assertEqual(conn.send_bytes(arr), None)
1429 self.assertEqual(conn.recv_bytes_into(buffer),
1430 len(arr) * buffer.itemsize)
1431 self.assertEqual(list(buffer), expected)
1432
1433 buffer = array.array('i', [0]*10)
1434 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1435 self.assertEqual(conn.send_bytes(arr), None)
1436 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1437 len(arr) * buffer.itemsize)
1438 self.assertEqual(list(buffer), expected)
1439
1440 buffer = bytearray(latin(' ' * 40))
1441 self.assertEqual(conn.send_bytes(longmsg), None)
1442 try:
1443 res = conn.recv_bytes_into(buffer)
1444 except multiprocessing.BufferTooShort as e:
1445 self.assertEqual(e.args, (longmsg,))
1446 else:
1447 self.fail('expected BufferTooShort, got %s' % res)
1448
1449 poll = TimingWrapper(conn.poll)
1450
1451 self.assertEqual(poll(), False)
1452 self.assertTimingAlmostEqual(poll.elapsed, 0)
1453
1454 self.assertEqual(poll(TIMEOUT1), False)
1455 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1456
1457 conn.send(None)
1458
1459 self.assertEqual(poll(TIMEOUT1), True)
1460 self.assertTimingAlmostEqual(poll.elapsed, 0)
1461
1462 self.assertEqual(conn.recv(), None)
1463
1464 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1465 conn.send_bytes(really_big_msg)
1466 self.assertEqual(conn.recv_bytes(), really_big_msg)
1467
1468 conn.send_bytes(SENTINEL) # tell child to quit
1469 child_conn.close()
1470
1471 if self.TYPE == 'processes':
1472 self.assertEqual(conn.readable, True)
1473 self.assertEqual(conn.writable, True)
1474 self.assertRaises(EOFError, conn.recv)
1475 self.assertRaises(EOFError, conn.recv_bytes)
1476
1477 p.join()
1478
1479 def test_duplex_false(self):
1480 reader, writer = self.Pipe(duplex=False)
1481 self.assertEqual(writer.send(1), None)
1482 self.assertEqual(reader.recv(), 1)
1483 if self.TYPE == 'processes':
1484 self.assertEqual(reader.readable, True)
1485 self.assertEqual(reader.writable, False)
1486 self.assertEqual(writer.readable, False)
1487 self.assertEqual(writer.writable, True)
1488 self.assertRaises(IOError, reader.send, 2)
1489 self.assertRaises(IOError, writer.recv)
1490 self.assertRaises(IOError, writer.poll)
1491
1492 def test_spawn_close(self):
1493 # We test that a pipe connection can be closed by parent
1494 # process immediately after child is spawned. On Windows this
1495 # would have sometimes failed on old versions because
1496 # child_conn would be closed before the child got a chance to
1497 # duplicate it.
1498 conn, child_conn = self.Pipe()
1499
1500 p = self.Process(target=self._echo, args=(child_conn,))
1501 p.start()
1502 child_conn.close() # this might complete before child initializes
1503
1504 msg = latin('hello')
1505 conn.send_bytes(msg)
1506 self.assertEqual(conn.recv_bytes(), msg)
1507
1508 conn.send_bytes(SENTINEL)
1509 conn.close()
1510 p.join()
1511
1512 def test_sendbytes(self):
1513 if self.TYPE != 'processes':
1514 return
1515
1516 msg = latin('abcdefghijklmnopqrstuvwxyz')
1517 a, b = self.Pipe()
1518
1519 a.send_bytes(msg)
1520 self.assertEqual(b.recv_bytes(), msg)
1521
1522 a.send_bytes(msg, 5)
1523 self.assertEqual(b.recv_bytes(), msg[5:])
1524
1525 a.send_bytes(msg, 7, 8)
1526 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1527
1528 a.send_bytes(msg, 26)
1529 self.assertEqual(b.recv_bytes(), latin(''))
1530
1531 a.send_bytes(msg, 26, 0)
1532 self.assertEqual(b.recv_bytes(), latin(''))
1533
1534 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1535
1536 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1537
1538 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1539
1540 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1541
1542 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1543
Benjamin Petersone711caf2008-06-11 16:44:04 +00001544class _TestListenerClient(BaseTestCase):
1545
1546 ALLOWED_TYPES = ('processes', 'threads')
1547
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001548 @classmethod
1549 def _test(cls, address):
1550 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001551 conn.send('hello')
1552 conn.close()
1553
1554 def test_listener_client(self):
1555 for family in self.connection.families:
1556 l = self.connection.Listener(family=family)
1557 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001558 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001559 p.start()
1560 conn = l.accept()
1561 self.assertEqual(conn.recv(), 'hello')
1562 p.join()
1563 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001564#
1565# Test of sending connection and socket objects between processes
1566#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001567"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001568class _TestPicklingConnections(BaseTestCase):
1569
1570 ALLOWED_TYPES = ('processes',)
1571
1572 def _listener(self, conn, families):
1573 for fam in families:
1574 l = self.connection.Listener(family=fam)
1575 conn.send(l.address)
1576 new_conn = l.accept()
1577 conn.send(new_conn)
1578
1579 if self.TYPE == 'processes':
1580 l = socket.socket()
1581 l.bind(('localhost', 0))
1582 conn.send(l.getsockname())
1583 l.listen(1)
1584 new_conn, addr = l.accept()
1585 conn.send(new_conn)
1586
1587 conn.recv()
1588
1589 def _remote(self, conn):
1590 for (address, msg) in iter(conn.recv, None):
1591 client = self.connection.Client(address)
1592 client.send(msg.upper())
1593 client.close()
1594
1595 if self.TYPE == 'processes':
1596 address, msg = conn.recv()
1597 client = socket.socket()
1598 client.connect(address)
1599 client.sendall(msg.upper())
1600 client.close()
1601
1602 conn.close()
1603
1604 def test_pickling(self):
1605 try:
1606 multiprocessing.allow_connection_pickling()
1607 except ImportError:
1608 return
1609
1610 families = self.connection.families
1611
1612 lconn, lconn0 = self.Pipe()
1613 lp = self.Process(target=self._listener, args=(lconn0, families))
1614 lp.start()
1615 lconn0.close()
1616
1617 rconn, rconn0 = self.Pipe()
1618 rp = self.Process(target=self._remote, args=(rconn0,))
1619 rp.start()
1620 rconn0.close()
1621
1622 for fam in families:
1623 msg = ('This connection uses family %s' % fam).encode('ascii')
1624 address = lconn.recv()
1625 rconn.send((address, msg))
1626 new_conn = lconn.recv()
1627 self.assertEqual(new_conn.recv(), msg.upper())
1628
1629 rconn.send(None)
1630
1631 if self.TYPE == 'processes':
1632 msg = latin('This connection uses a normal socket')
1633 address = lconn.recv()
1634 rconn.send((address, msg))
1635 if hasattr(socket, 'fromfd'):
1636 new_conn = lconn.recv()
1637 self.assertEqual(new_conn.recv(100), msg.upper())
1638 else:
1639 # XXX On Windows with Py2.6 need to backport fromfd()
1640 discard = lconn.recv_bytes()
1641
1642 lconn.send(None)
1643
1644 rconn.close()
1645 lconn.close()
1646
1647 lp.join()
1648 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001649"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001650#
1651#
1652#
1653
1654class _TestHeap(BaseTestCase):
1655
1656 ALLOWED_TYPES = ('processes',)
1657
1658 def test_heap(self):
1659 iterations = 5000
1660 maxblocks = 50
1661 blocks = []
1662
1663 # create and destroy lots of blocks of different sizes
1664 for i in range(iterations):
1665 size = int(random.lognormvariate(0, 1) * 1000)
1666 b = multiprocessing.heap.BufferWrapper(size)
1667 blocks.append(b)
1668 if len(blocks) > maxblocks:
1669 i = random.randrange(maxblocks)
1670 del blocks[i]
1671
1672 # get the heap object
1673 heap = multiprocessing.heap.BufferWrapper._heap
1674
1675 # verify the state of the heap
1676 all = []
1677 occupied = 0
1678 for L in list(heap._len_to_seq.values()):
1679 for arena, start, stop in L:
1680 all.append((heap._arenas.index(arena), start, stop,
1681 stop-start, 'free'))
1682 for arena, start, stop in heap._allocated_blocks:
1683 all.append((heap._arenas.index(arena), start, stop,
1684 stop-start, 'occupied'))
1685 occupied += (stop-start)
1686
1687 all.sort()
1688
1689 for i in range(len(all)-1):
1690 (arena, start, stop) = all[i][:3]
1691 (narena, nstart, nstop) = all[i+1][:3]
1692 self.assertTrue((arena != narena and nstart == 0) or
1693 (stop == nstart))
1694
1695#
1696#
1697#
1698
Benjamin Petersone711caf2008-06-11 16:44:04 +00001699class _Foo(Structure):
1700 _fields_ = [
1701 ('x', c_int),
1702 ('y', c_double)
1703 ]
1704
1705class _TestSharedCTypes(BaseTestCase):
1706
1707 ALLOWED_TYPES = ('processes',)
1708
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001709 def setUp(self):
1710 if not HAS_SHAREDCTYPES:
1711 self.skipTest("requires multiprocessing.sharedctypes")
1712
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001713 @classmethod
1714 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001715 x.value *= 2
1716 y.value *= 2
1717 foo.x *= 2
1718 foo.y *= 2
1719 string.value *= 2
1720 for i in range(len(arr)):
1721 arr[i] *= 2
1722
1723 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001724 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001725 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001726 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001727 arr = self.Array('d', list(range(10)), lock=lock)
1728 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001729 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001730
1731 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1732 p.start()
1733 p.join()
1734
1735 self.assertEqual(x.value, 14)
1736 self.assertAlmostEqual(y.value, 2.0/3.0)
1737 self.assertEqual(foo.x, 6)
1738 self.assertAlmostEqual(foo.y, 4.0)
1739 for i in range(10):
1740 self.assertAlmostEqual(arr[i], i*2)
1741 self.assertEqual(string.value, latin('hellohello'))
1742
1743 def test_synchronize(self):
1744 self.test_sharedctypes(lock=True)
1745
1746 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001747 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001748 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001749 foo.x = 0
1750 foo.y = 0
1751 self.assertEqual(bar.x, 2)
1752 self.assertAlmostEqual(bar.y, 5.0)
1753
1754#
1755#
1756#
1757
1758class _TestFinalize(BaseTestCase):
1759
1760 ALLOWED_TYPES = ('processes',)
1761
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001762 @classmethod
1763 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001764 class Foo(object):
1765 pass
1766
1767 a = Foo()
1768 util.Finalize(a, conn.send, args=('a',))
1769 del a # triggers callback for a
1770
1771 b = Foo()
1772 close_b = util.Finalize(b, conn.send, args=('b',))
1773 close_b() # triggers callback for b
1774 close_b() # does nothing because callback has already been called
1775 del b # does nothing because callback has already been called
1776
1777 c = Foo()
1778 util.Finalize(c, conn.send, args=('c',))
1779
1780 d10 = Foo()
1781 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1782
1783 d01 = Foo()
1784 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1785 d02 = Foo()
1786 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1787 d03 = Foo()
1788 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1789
1790 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1791
1792 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1793
Ezio Melotti13925002011-03-16 11:05:33 +02001794 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00001795 # garbage collecting locals
1796 util._exit_function()
1797 conn.close()
1798 os._exit(0)
1799
1800 def test_finalize(self):
1801 conn, child_conn = self.Pipe()
1802
1803 p = self.Process(target=self._test_finalize, args=(child_conn,))
1804 p.start()
1805 p.join()
1806
1807 result = [obj for obj in iter(conn.recv, 'STOP')]
1808 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1809
1810#
1811# Test that from ... import * works for each module
1812#
1813
1814class _TestImportStar(BaseTestCase):
1815
1816 ALLOWED_TYPES = ('processes',)
1817
1818 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001819 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001820 'multiprocessing', 'multiprocessing.connection',
1821 'multiprocessing.heap', 'multiprocessing.managers',
1822 'multiprocessing.pool', 'multiprocessing.process',
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001823 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001824 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001825 ]
1826
1827 if c_int is not None:
1828 # This module requires _ctypes
1829 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001830
1831 for name in modules:
1832 __import__(name)
1833 mod = sys.modules[name]
1834
1835 for attr in getattr(mod, '__all__', ()):
1836 self.assertTrue(
1837 hasattr(mod, attr),
1838 '%r does not have attribute %r' % (mod, attr)
1839 )
1840
1841#
1842# Quick test that logging works -- does not test logging output
1843#
1844
1845class _TestLogging(BaseTestCase):
1846
1847 ALLOWED_TYPES = ('processes',)
1848
1849 def test_enable_logging(self):
1850 logger = multiprocessing.get_logger()
1851 logger.setLevel(util.SUBWARNING)
1852 self.assertTrue(logger is not None)
1853 logger.debug('this will not be printed')
1854 logger.info('nor will this')
1855 logger.setLevel(LOG_LEVEL)
1856
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001857 @classmethod
1858 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001859 logger = multiprocessing.get_logger()
1860 conn.send(logger.getEffectiveLevel())
1861
1862 def test_level(self):
1863 LEVEL1 = 32
1864 LEVEL2 = 37
1865
1866 logger = multiprocessing.get_logger()
1867 root_logger = logging.getLogger()
1868 root_level = root_logger.level
1869
1870 reader, writer = multiprocessing.Pipe(duplex=False)
1871
1872 logger.setLevel(LEVEL1)
1873 self.Process(target=self._test_level, args=(writer,)).start()
1874 self.assertEqual(LEVEL1, reader.recv())
1875
1876 logger.setLevel(logging.NOTSET)
1877 root_logger.setLevel(LEVEL2)
1878 self.Process(target=self._test_level, args=(writer,)).start()
1879 self.assertEqual(LEVEL2, reader.recv())
1880
1881 root_logger.setLevel(root_level)
1882 logger.setLevel(level=LOG_LEVEL)
1883
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001884
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00001885# class _TestLoggingProcessName(BaseTestCase):
1886#
1887# def handle(self, record):
1888# assert record.processName == multiprocessing.current_process().name
1889# self.__handled = True
1890#
1891# def test_logging(self):
1892# handler = logging.Handler()
1893# handler.handle = self.handle
1894# self.__handled = False
1895# # Bypass getLogger() and side-effects
1896# logger = logging.getLoggerClass()(
1897# 'multiprocessing.test.TestLoggingProcessName')
1898# logger.addHandler(handler)
1899# logger.propagate = False
1900#
1901# logger.warn('foo')
1902# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001903
Benjamin Petersone711caf2008-06-11 16:44:04 +00001904#
Jesse Noller6214edd2009-01-19 16:23:53 +00001905# Test to verify handle verification, see issue 3321
1906#
1907
1908class TestInvalidHandle(unittest.TestCase):
1909
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001910 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001911 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00001912 conn = _multiprocessing.Connection(44977608)
1913 self.assertRaises(IOError, conn.poll)
1914 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001915
Jesse Noller6214edd2009-01-19 16:23:53 +00001916#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001917# Functions used to create test cases from the base ones in this module
1918#
1919
1920def get_attributes(Source, names):
1921 d = {}
1922 for name in names:
1923 obj = getattr(Source, name)
1924 if type(obj) == type(get_attributes):
1925 obj = staticmethod(obj)
1926 d[name] = obj
1927 return d
1928
1929def create_test_cases(Mixin, type):
1930 result = {}
1931 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001932 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001933
1934 for name in list(glob.keys()):
1935 if name.startswith('_Test'):
1936 base = glob[name]
1937 if type in base.ALLOWED_TYPES:
1938 newname = 'With' + Type + name[1:]
1939 class Temp(base, unittest.TestCase, Mixin):
1940 pass
1941 result[newname] = Temp
1942 Temp.__name__ = newname
1943 Temp.__module__ = Mixin.__module__
1944 return result
1945
1946#
1947# Create test cases
1948#
1949
1950class ProcessesMixin(object):
1951 TYPE = 'processes'
1952 Process = multiprocessing.Process
1953 locals().update(get_attributes(multiprocessing, (
1954 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1955 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1956 'RawArray', 'current_process', 'active_children', 'Pipe',
1957 'connection', 'JoinableQueue'
1958 )))
1959
1960testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1961globals().update(testcases_processes)
1962
1963
1964class ManagerMixin(object):
1965 TYPE = 'manager'
1966 Process = multiprocessing.Process
1967 manager = object.__new__(multiprocessing.managers.SyncManager)
1968 locals().update(get_attributes(manager, (
1969 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1970 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1971 'Namespace', 'JoinableQueue'
1972 )))
1973
1974testcases_manager = create_test_cases(ManagerMixin, type='manager')
1975globals().update(testcases_manager)
1976
1977
1978class ThreadsMixin(object):
1979 TYPE = 'threads'
1980 Process = multiprocessing.dummy.Process
1981 locals().update(get_attributes(multiprocessing.dummy, (
1982 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1983 'Condition', 'Event', 'Value', 'Array', 'current_process',
1984 'active_children', 'Pipe', 'connection', 'dict', 'list',
1985 'Namespace', 'JoinableQueue'
1986 )))
1987
1988testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1989globals().update(testcases_threads)
1990
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001991class OtherTest(unittest.TestCase):
1992 # TODO: add more tests for deliver/answer challenge.
1993 def test_deliver_challenge_auth_failure(self):
1994 class _FakeConnection(object):
1995 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001996 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001997 def send_bytes(self, data):
1998 pass
1999 self.assertRaises(multiprocessing.AuthenticationError,
2000 multiprocessing.connection.deliver_challenge,
2001 _FakeConnection(), b'abc')
2002
2003 def test_answer_challenge_auth_failure(self):
2004 class _FakeConnection(object):
2005 def __init__(self):
2006 self.count = 0
2007 def recv_bytes(self, size):
2008 self.count += 1
2009 if self.count == 1:
2010 return multiprocessing.connection.CHALLENGE
2011 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002012 return b'something bogus'
2013 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002014 def send_bytes(self, data):
2015 pass
2016 self.assertRaises(multiprocessing.AuthenticationError,
2017 multiprocessing.connection.answer_challenge,
2018 _FakeConnection(), b'abc')
2019
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002020#
2021# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2022#
2023
2024def initializer(ns):
2025 ns.test += 1
2026
2027class TestInitializers(unittest.TestCase):
2028 def setUp(self):
2029 self.mgr = multiprocessing.Manager()
2030 self.ns = self.mgr.Namespace()
2031 self.ns.test = 0
2032
2033 def tearDown(self):
2034 self.mgr.shutdown()
2035
2036 def test_manager_initializer(self):
2037 m = multiprocessing.managers.SyncManager()
2038 self.assertRaises(TypeError, m.start, 1)
2039 m.start(initializer, (self.ns,))
2040 self.assertEqual(self.ns.test, 1)
2041 m.shutdown()
2042
2043 def test_pool_initializer(self):
2044 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2045 p = multiprocessing.Pool(1, initializer, (self.ns,))
2046 p.close()
2047 p.join()
2048 self.assertEqual(self.ns.test, 1)
2049
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002050#
2051# Issue 5155, 5313, 5331: Test process in processes
2052# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2053#
2054
2055def _ThisSubProcess(q):
2056 try:
2057 item = q.get(block=False)
2058 except pyqueue.Empty:
2059 pass
2060
2061def _TestProcess(q):
2062 queue = multiprocessing.Queue()
2063 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
2064 subProc.start()
2065 subProc.join()
2066
2067def _afunc(x):
2068 return x*x
2069
2070def pool_in_process():
2071 pool = multiprocessing.Pool(processes=4)
2072 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2073
2074class _file_like(object):
2075 def __init__(self, delegate):
2076 self._delegate = delegate
2077 self._pid = None
2078
2079 @property
2080 def cache(self):
2081 pid = os.getpid()
2082 # There are no race conditions since fork keeps only the running thread
2083 if pid != self._pid:
2084 self._pid = pid
2085 self._cache = []
2086 return self._cache
2087
2088 def write(self, data):
2089 self.cache.append(data)
2090
2091 def flush(self):
2092 self._delegate.write(''.join(self.cache))
2093 self._cache = []
2094
2095class TestStdinBadfiledescriptor(unittest.TestCase):
2096
2097 def test_queue_in_process(self):
2098 queue = multiprocessing.Queue()
2099 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2100 proc.start()
2101 proc.join()
2102
2103 def test_pool_in_process(self):
2104 p = multiprocessing.Process(target=pool_in_process)
2105 p.start()
2106 p.join()
2107
2108 def test_flushing(self):
2109 sio = io.StringIO()
2110 flike = _file_like(sio)
2111 flike.write('foo')
2112 proc = multiprocessing.Process(target=lambda: flike.flush())
2113 flike.flush()
2114 assert sio.getvalue() == 'foo'
2115
2116testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2117 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002118
Benjamin Petersone711caf2008-06-11 16:44:04 +00002119#
2120#
2121#
2122
2123def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002124 if sys.platform.startswith("linux"):
2125 try:
2126 lock = multiprocessing.RLock()
2127 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002128 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002129
Benjamin Petersone711caf2008-06-11 16:44:04 +00002130 if run is None:
2131 from test.support import run_unittest as run
2132
2133 util.get_temp_dir() # creates temp directory for use by all processes
2134
2135 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2136
Benjamin Peterson41181742008-07-02 20:22:54 +00002137 ProcessesMixin.pool = multiprocessing.Pool(4)
2138 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2139 ManagerMixin.manager.__init__()
2140 ManagerMixin.manager.start()
2141 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002142
2143 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002144 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2145 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002146 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2147 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002148 )
2149
2150 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2151 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2152 run(suite)
2153
Benjamin Peterson41181742008-07-02 20:22:54 +00002154 ThreadsMixin.pool.terminate()
2155 ProcessesMixin.pool.terminate()
2156 ManagerMixin.pool.terminate()
2157 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002158
Benjamin Peterson41181742008-07-02 20:22:54 +00002159 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002160
2161def main():
2162 test_main(unittest.TextTestRunner(verbosity=2).run)
2163
2164if __name__ == '__main__':
2165 main()