blob: d154fce98aa9e4642f12d54a54bec5b3346292a1 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
14import signal
15import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import socket
17import random
18import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000019import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000020
Benjamin Petersone5384b02008-10-04 22:00:42 +000021
R. David Murraya21e4ca2009-03-31 23:16:50 +000022# Skip tests if _multiprocessing wasn't built.
23_multiprocessing = test.support.import_module('_multiprocessing')
24# Skip tests if sem_open implementation is broken.
25test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000026# import threading after _multiprocessing to raise a more revelant error
27# message: "No module named _multiprocessing". _multiprocessing is not compiled
28# without thread support.
29import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000030
Benjamin Petersone711caf2008-06-11 16:44:04 +000031import multiprocessing.dummy
32import multiprocessing.connection
33import multiprocessing.managers
34import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000035import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000036
37from multiprocessing import util
38
Brian Curtinafa88b52010-10-07 01:12:19 +000039try:
40 from multiprocessing.sharedctypes import Value, copy
41 HAS_SHAREDCTYPES = True
42except ImportError:
43 HAS_SHAREDCTYPES = False
44
Benjamin Petersone711caf2008-06-11 16:44:04 +000045#
46#
47#
48
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000049def latin(s):
50 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000051
Benjamin Petersone711caf2008-06-11 16:44:04 +000052#
53# Constants
54#
55
56LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000057#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000058
59DELTA = 0.1
60CHECK_TIMINGS = False # making true makes tests take a lot longer
61 # and can sometimes cause some non-serious
62 # failures because some calls block a bit
63 # longer than expected
64if CHECK_TIMINGS:
65 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
66else:
67 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
68
69HAVE_GETVALUE = not getattr(_multiprocessing,
70 'HAVE_BROKEN_SEM_GETVALUE', False)
71
Jesse Noller6214edd2009-01-19 16:23:53 +000072WIN32 = (sys.platform == "win32")
73
Benjamin Petersone711caf2008-06-11 16:44:04 +000074#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000075# Some tests require ctypes
76#
77
78try:
Florent Xiclunaaa171062010-08-14 15:56:42 +000079 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +000080except ImportError:
81 Structure = object
82 c_int = c_double = None
83
84#
Benjamin Petersone711caf2008-06-11 16:44:04 +000085# Creates a wrapper for a function which records the time it takes to finish
86#
87
88class TimingWrapper(object):
89
90 def __init__(self, func):
91 self.func = func
92 self.elapsed = None
93
94 def __call__(self, *args, **kwds):
95 t = time.time()
96 try:
97 return self.func(*args, **kwds)
98 finally:
99 self.elapsed = time.time() - t
100
101#
102# Base class for test cases
103#
104
105class BaseTestCase(object):
106
107 ALLOWED_TYPES = ('processes', 'manager', 'threads')
108
109 def assertTimingAlmostEqual(self, a, b):
110 if CHECK_TIMINGS:
111 self.assertAlmostEqual(a, b, 1)
112
113 def assertReturnsIfImplemented(self, value, func, *args):
114 try:
115 res = func(*args)
116 except NotImplementedError:
117 pass
118 else:
119 return self.assertEqual(value, res)
120
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000121 # For the sanity of Windows users, rather than crashing or freezing in
122 # multiple ways.
123 def __reduce__(self, *args):
124 raise NotImplementedError("shouldn't try to pickle a test case")
125
126 __reduce_ex__ = __reduce__
127
Benjamin Petersone711caf2008-06-11 16:44:04 +0000128#
129# Return the value of a semaphore
130#
131
132def get_value(self):
133 try:
134 return self.get_value()
135 except AttributeError:
136 try:
137 return self._Semaphore__value
138 except AttributeError:
139 try:
140 return self._value
141 except AttributeError:
142 raise NotImplementedError
143
144#
145# Testcases
146#
147
148class _TestProcess(BaseTestCase):
149
150 ALLOWED_TYPES = ('processes', 'threads')
151
152 def test_current(self):
153 if self.TYPE == 'threads':
154 return
155
156 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000157 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000158
159 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000160 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000161 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000162 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000163 self.assertEqual(current.ident, os.getpid())
164 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000165
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000166 def test_daemon_argument(self):
167 if self.TYPE == "threads":
168 return
169
170 # By default uses the current process's daemon flag.
171 proc0 = self.Process(target=self._test)
172 self.assertEquals(proc0.daemon, self.current_process().daemon)
173 proc1 = self.Process(target=self._test, daemon=True)
174 self.assertTrue(proc1.daemon)
175 proc2 = self.Process(target=self._test, daemon=False)
176 self.assertFalse(proc2.daemon)
177
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000178 @classmethod
179 def _test(cls, q, *args, **kwds):
180 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000181 q.put(args)
182 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000183 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000184 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000185 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000186 q.put(current.pid)
187
188 def test_process(self):
189 q = self.Queue(1)
190 e = self.Event()
191 args = (q, 1, 2)
192 kwargs = {'hello':23, 'bye':2.54}
193 name = 'SomeProcess'
194 p = self.Process(
195 target=self._test, args=args, kwargs=kwargs, name=name
196 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000197 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000198 current = self.current_process()
199
200 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000201 self.assertEqual(p.authkey, current.authkey)
202 self.assertEqual(p.is_alive(), False)
203 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000204 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000205 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000206 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000207
208 p.start()
209
Ezio Melottib3aedd42010-11-20 19:04:17 +0000210 self.assertEqual(p.exitcode, None)
211 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000212 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000213
Ezio Melottib3aedd42010-11-20 19:04:17 +0000214 self.assertEqual(q.get(), args[1:])
215 self.assertEqual(q.get(), kwargs)
216 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000217 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000218 self.assertEqual(q.get(), current.authkey)
219 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000220
221 p.join()
222
Ezio Melottib3aedd42010-11-20 19:04:17 +0000223 self.assertEqual(p.exitcode, 0)
224 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000225 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000226
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000227 @classmethod
228 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000229 time.sleep(1000)
230
231 def test_terminate(self):
232 if self.TYPE == 'threads':
233 return
234
235 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000236 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000237 p.start()
238
239 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000240 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000241 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000242
243 p.terminate()
244
245 join = TimingWrapper(p.join)
246 self.assertEqual(join(), None)
247 self.assertTimingAlmostEqual(join.elapsed, 0.0)
248
249 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000250 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000251
252 p.join()
253
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000254 # XXX sometimes get p.exitcode == 0 on Windows ...
255 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000256
257 def test_cpu_count(self):
258 try:
259 cpus = multiprocessing.cpu_count()
260 except NotImplementedError:
261 cpus = 1
262 self.assertTrue(type(cpus) is int)
263 self.assertTrue(cpus >= 1)
264
265 def test_active_children(self):
266 self.assertEqual(type(self.active_children()), list)
267
268 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000269 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000270
271 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000272 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000273
274 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000275 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000276
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000277 @classmethod
278 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000279 from multiprocessing import forking
280 wconn.send(id)
281 if len(id) < 2:
282 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000283 p = cls.Process(
284 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000285 )
286 p.start()
287 p.join()
288
289 def test_recursion(self):
290 rconn, wconn = self.Pipe(duplex=False)
291 self._test_recursion(wconn, [])
292
293 time.sleep(DELTA)
294 result = []
295 while rconn.poll():
296 result.append(rconn.recv())
297
298 expected = [
299 [],
300 [0],
301 [0, 0],
302 [0, 1],
303 [1],
304 [1, 0],
305 [1, 1]
306 ]
307 self.assertEqual(result, expected)
308
309#
310#
311#
312
313class _UpperCaser(multiprocessing.Process):
314
315 def __init__(self):
316 multiprocessing.Process.__init__(self)
317 self.child_conn, self.parent_conn = multiprocessing.Pipe()
318
319 def run(self):
320 self.parent_conn.close()
321 for s in iter(self.child_conn.recv, None):
322 self.child_conn.send(s.upper())
323 self.child_conn.close()
324
325 def submit(self, s):
326 assert type(s) is str
327 self.parent_conn.send(s)
328 return self.parent_conn.recv()
329
330 def stop(self):
331 self.parent_conn.send(None)
332 self.parent_conn.close()
333 self.child_conn.close()
334
335class _TestSubclassingProcess(BaseTestCase):
336
337 ALLOWED_TYPES = ('processes',)
338
339 def test_subclassing(self):
340 uppercaser = _UpperCaser()
341 uppercaser.start()
342 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
343 self.assertEqual(uppercaser.submit('world'), 'WORLD')
344 uppercaser.stop()
345 uppercaser.join()
346
347#
348#
349#
350
351def queue_empty(q):
352 if hasattr(q, 'empty'):
353 return q.empty()
354 else:
355 return q.qsize() == 0
356
357def queue_full(q, maxsize):
358 if hasattr(q, 'full'):
359 return q.full()
360 else:
361 return q.qsize() == maxsize
362
363
364class _TestQueue(BaseTestCase):
365
366
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000367 @classmethod
368 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000369 child_can_start.wait()
370 for i in range(6):
371 queue.get()
372 parent_can_continue.set()
373
374 def test_put(self):
375 MAXSIZE = 6
376 queue = self.Queue(maxsize=MAXSIZE)
377 child_can_start = self.Event()
378 parent_can_continue = self.Event()
379
380 proc = self.Process(
381 target=self._test_put,
382 args=(queue, child_can_start, parent_can_continue)
383 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000384 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000385 proc.start()
386
387 self.assertEqual(queue_empty(queue), True)
388 self.assertEqual(queue_full(queue, MAXSIZE), False)
389
390 queue.put(1)
391 queue.put(2, True)
392 queue.put(3, True, None)
393 queue.put(4, False)
394 queue.put(5, False, None)
395 queue.put_nowait(6)
396
397 # the values may be in buffer but not yet in pipe so sleep a bit
398 time.sleep(DELTA)
399
400 self.assertEqual(queue_empty(queue), False)
401 self.assertEqual(queue_full(queue, MAXSIZE), True)
402
403 put = TimingWrapper(queue.put)
404 put_nowait = TimingWrapper(queue.put_nowait)
405
406 self.assertRaises(pyqueue.Full, put, 7, False)
407 self.assertTimingAlmostEqual(put.elapsed, 0)
408
409 self.assertRaises(pyqueue.Full, put, 7, False, None)
410 self.assertTimingAlmostEqual(put.elapsed, 0)
411
412 self.assertRaises(pyqueue.Full, put_nowait, 7)
413 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
414
415 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
416 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
417
418 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
419 self.assertTimingAlmostEqual(put.elapsed, 0)
420
421 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
422 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
423
424 child_can_start.set()
425 parent_can_continue.wait()
426
427 self.assertEqual(queue_empty(queue), True)
428 self.assertEqual(queue_full(queue, MAXSIZE), False)
429
430 proc.join()
431
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000432 @classmethod
433 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000434 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000435 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000436 queue.put(2)
437 queue.put(3)
438 queue.put(4)
439 queue.put(5)
440 parent_can_continue.set()
441
442 def test_get(self):
443 queue = self.Queue()
444 child_can_start = self.Event()
445 parent_can_continue = self.Event()
446
447 proc = self.Process(
448 target=self._test_get,
449 args=(queue, child_can_start, parent_can_continue)
450 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000451 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000452 proc.start()
453
454 self.assertEqual(queue_empty(queue), True)
455
456 child_can_start.set()
457 parent_can_continue.wait()
458
459 time.sleep(DELTA)
460 self.assertEqual(queue_empty(queue), False)
461
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000462 # Hangs unexpectedly, remove for now
463 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000464 self.assertEqual(queue.get(True, None), 2)
465 self.assertEqual(queue.get(True), 3)
466 self.assertEqual(queue.get(timeout=1), 4)
467 self.assertEqual(queue.get_nowait(), 5)
468
469 self.assertEqual(queue_empty(queue), True)
470
471 get = TimingWrapper(queue.get)
472 get_nowait = TimingWrapper(queue.get_nowait)
473
474 self.assertRaises(pyqueue.Empty, get, False)
475 self.assertTimingAlmostEqual(get.elapsed, 0)
476
477 self.assertRaises(pyqueue.Empty, get, False, None)
478 self.assertTimingAlmostEqual(get.elapsed, 0)
479
480 self.assertRaises(pyqueue.Empty, get_nowait)
481 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
482
483 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
484 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
485
486 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
487 self.assertTimingAlmostEqual(get.elapsed, 0)
488
489 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
490 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
491
492 proc.join()
493
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000494 @classmethod
495 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000496 for i in range(10, 20):
497 queue.put(i)
498 # note that at this point the items may only be buffered, so the
499 # process cannot shutdown until the feeder thread has finished
500 # pushing items onto the pipe.
501
502 def test_fork(self):
503 # Old versions of Queue would fail to create a new feeder
504 # thread for a forked process if the original process had its
505 # own feeder thread. This test checks that this no longer
506 # happens.
507
508 queue = self.Queue()
509
510 # put items on queue so that main process starts a feeder thread
511 for i in range(10):
512 queue.put(i)
513
514 # wait to make sure thread starts before we fork a new process
515 time.sleep(DELTA)
516
517 # fork process
518 p = self.Process(target=self._test_fork, args=(queue,))
519 p.start()
520
521 # check that all expected items are in the queue
522 for i in range(20):
523 self.assertEqual(queue.get(), i)
524 self.assertRaises(pyqueue.Empty, queue.get, False)
525
526 p.join()
527
528 def test_qsize(self):
529 q = self.Queue()
530 try:
531 self.assertEqual(q.qsize(), 0)
532 except NotImplementedError:
533 return
534 q.put(1)
535 self.assertEqual(q.qsize(), 1)
536 q.put(5)
537 self.assertEqual(q.qsize(), 2)
538 q.get()
539 self.assertEqual(q.qsize(), 1)
540 q.get()
541 self.assertEqual(q.qsize(), 0)
542
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000543 @classmethod
544 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000545 for obj in iter(q.get, None):
546 time.sleep(DELTA)
547 q.task_done()
548
549 def test_task_done(self):
550 queue = self.JoinableQueue()
551
552 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000553 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000554
555 workers = [self.Process(target=self._test_task_done, args=(queue,))
556 for i in range(4)]
557
558 for p in workers:
559 p.start()
560
561 for i in range(10):
562 queue.put(i)
563
564 queue.join()
565
566 for p in workers:
567 queue.put(None)
568
569 for p in workers:
570 p.join()
571
572#
573#
574#
575
576class _TestLock(BaseTestCase):
577
578 def test_lock(self):
579 lock = self.Lock()
580 self.assertEqual(lock.acquire(), True)
581 self.assertEqual(lock.acquire(False), False)
582 self.assertEqual(lock.release(), None)
583 self.assertRaises((ValueError, threading.ThreadError), lock.release)
584
585 def test_rlock(self):
586 lock = self.RLock()
587 self.assertEqual(lock.acquire(), True)
588 self.assertEqual(lock.acquire(), True)
589 self.assertEqual(lock.acquire(), True)
590 self.assertEqual(lock.release(), None)
591 self.assertEqual(lock.release(), None)
592 self.assertEqual(lock.release(), None)
593 self.assertRaises((AssertionError, RuntimeError), lock.release)
594
Jesse Nollerf8d00852009-03-31 03:25:07 +0000595 def test_lock_context(self):
596 with self.Lock():
597 pass
598
Benjamin Petersone711caf2008-06-11 16:44:04 +0000599
600class _TestSemaphore(BaseTestCase):
601
602 def _test_semaphore(self, sem):
603 self.assertReturnsIfImplemented(2, get_value, sem)
604 self.assertEqual(sem.acquire(), True)
605 self.assertReturnsIfImplemented(1, get_value, sem)
606 self.assertEqual(sem.acquire(), True)
607 self.assertReturnsIfImplemented(0, get_value, sem)
608 self.assertEqual(sem.acquire(False), False)
609 self.assertReturnsIfImplemented(0, get_value, sem)
610 self.assertEqual(sem.release(), None)
611 self.assertReturnsIfImplemented(1, get_value, sem)
612 self.assertEqual(sem.release(), None)
613 self.assertReturnsIfImplemented(2, get_value, sem)
614
615 def test_semaphore(self):
616 sem = self.Semaphore(2)
617 self._test_semaphore(sem)
618 self.assertEqual(sem.release(), None)
619 self.assertReturnsIfImplemented(3, get_value, sem)
620 self.assertEqual(sem.release(), None)
621 self.assertReturnsIfImplemented(4, get_value, sem)
622
623 def test_bounded_semaphore(self):
624 sem = self.BoundedSemaphore(2)
625 self._test_semaphore(sem)
626 # Currently fails on OS/X
627 #if HAVE_GETVALUE:
628 # self.assertRaises(ValueError, sem.release)
629 # self.assertReturnsIfImplemented(2, get_value, sem)
630
631 def test_timeout(self):
632 if self.TYPE != 'processes':
633 return
634
635 sem = self.Semaphore(0)
636 acquire = TimingWrapper(sem.acquire)
637
638 self.assertEqual(acquire(False), False)
639 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
640
641 self.assertEqual(acquire(False, None), False)
642 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
643
644 self.assertEqual(acquire(False, TIMEOUT1), False)
645 self.assertTimingAlmostEqual(acquire.elapsed, 0)
646
647 self.assertEqual(acquire(True, TIMEOUT2), False)
648 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
649
650 self.assertEqual(acquire(timeout=TIMEOUT3), False)
651 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
652
653
654class _TestCondition(BaseTestCase):
655
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000656 @classmethod
657 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000658 cond.acquire()
659 sleeping.release()
660 cond.wait(timeout)
661 woken.release()
662 cond.release()
663
664 def check_invariant(self, cond):
665 # this is only supposed to succeed when there are no sleepers
666 if self.TYPE == 'processes':
667 try:
668 sleepers = (cond._sleeping_count.get_value() -
669 cond._woken_count.get_value())
670 self.assertEqual(sleepers, 0)
671 self.assertEqual(cond._wait_semaphore.get_value(), 0)
672 except NotImplementedError:
673 pass
674
675 def test_notify(self):
676 cond = self.Condition()
677 sleeping = self.Semaphore(0)
678 woken = self.Semaphore(0)
679
680 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000681 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000682 p.start()
683
684 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000685 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000686 p.start()
687
688 # wait for both children to start sleeping
689 sleeping.acquire()
690 sleeping.acquire()
691
692 # check no process/thread has woken up
693 time.sleep(DELTA)
694 self.assertReturnsIfImplemented(0, get_value, woken)
695
696 # wake up one process/thread
697 cond.acquire()
698 cond.notify()
699 cond.release()
700
701 # check one process/thread has woken up
702 time.sleep(DELTA)
703 self.assertReturnsIfImplemented(1, get_value, woken)
704
705 # wake up another
706 cond.acquire()
707 cond.notify()
708 cond.release()
709
710 # check other has woken up
711 time.sleep(DELTA)
712 self.assertReturnsIfImplemented(2, get_value, woken)
713
714 # check state is not mucked up
715 self.check_invariant(cond)
716 p.join()
717
718 def test_notify_all(self):
719 cond = self.Condition()
720 sleeping = self.Semaphore(0)
721 woken = self.Semaphore(0)
722
723 # start some threads/processes which will timeout
724 for i in range(3):
725 p = self.Process(target=self.f,
726 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000727 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000728 p.start()
729
730 t = threading.Thread(target=self.f,
731 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000732 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000733 t.start()
734
735 # wait for them all to sleep
736 for i in range(6):
737 sleeping.acquire()
738
739 # check they have all timed out
740 for i in range(6):
741 woken.acquire()
742 self.assertReturnsIfImplemented(0, get_value, woken)
743
744 # check state is not mucked up
745 self.check_invariant(cond)
746
747 # start some more threads/processes
748 for i in range(3):
749 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000750 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000751 p.start()
752
753 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000754 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000755 t.start()
756
757 # wait for them to all sleep
758 for i in range(6):
759 sleeping.acquire()
760
761 # check no process/thread has woken up
762 time.sleep(DELTA)
763 self.assertReturnsIfImplemented(0, get_value, woken)
764
765 # wake them all up
766 cond.acquire()
767 cond.notify_all()
768 cond.release()
769
770 # check they have all woken
771 time.sleep(DELTA)
772 self.assertReturnsIfImplemented(6, get_value, woken)
773
774 # check state is not mucked up
775 self.check_invariant(cond)
776
777 def test_timeout(self):
778 cond = self.Condition()
779 wait = TimingWrapper(cond.wait)
780 cond.acquire()
781 res = wait(TIMEOUT1)
782 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000783 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000784 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
785
786
787class _TestEvent(BaseTestCase):
788
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000789 @classmethod
790 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000791 time.sleep(TIMEOUT2)
792 event.set()
793
794 def test_event(self):
795 event = self.Event()
796 wait = TimingWrapper(event.wait)
797
798 # Removed temporaily, due to API shear, this does not
799 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000800 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000801
Benjamin Peterson965ce872009-04-05 21:24:58 +0000802 # Removed, threading.Event.wait() will return the value of the __flag
803 # instead of None. API Shear with the semaphore backed mp.Event
804 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000805 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000806 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000807 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
808
809 event.set()
810
811 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000812 self.assertEqual(event.is_set(), True)
813 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000814 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000815 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000816 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
817 # self.assertEqual(event.is_set(), True)
818
819 event.clear()
820
821 #self.assertEqual(event.is_set(), False)
822
823 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000824 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000825
826#
827#
828#
829
830class _TestValue(BaseTestCase):
831
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000832 ALLOWED_TYPES = ('processes',)
833
Benjamin Petersone711caf2008-06-11 16:44:04 +0000834 codes_values = [
835 ('i', 4343, 24234),
836 ('d', 3.625, -4.25),
837 ('h', -232, 234),
838 ('c', latin('x'), latin('y'))
839 ]
840
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000841 def setUp(self):
842 if not HAS_SHAREDCTYPES:
843 self.skipTest("requires multiprocessing.sharedctypes")
844
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000845 @classmethod
846 def _test(cls, values):
847 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000848 sv.value = cv[2]
849
850
851 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000852 if raw:
853 values = [self.RawValue(code, value)
854 for code, value, _ in self.codes_values]
855 else:
856 values = [self.Value(code, value)
857 for code, value, _ in self.codes_values]
858
859 for sv, cv in zip(values, self.codes_values):
860 self.assertEqual(sv.value, cv[1])
861
862 proc = self.Process(target=self._test, args=(values,))
863 proc.start()
864 proc.join()
865
866 for sv, cv in zip(values, self.codes_values):
867 self.assertEqual(sv.value, cv[2])
868
869 def test_rawvalue(self):
870 self.test_value(raw=True)
871
872 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000873 val1 = self.Value('i', 5)
874 lock1 = val1.get_lock()
875 obj1 = val1.get_obj()
876
877 val2 = self.Value('i', 5, lock=None)
878 lock2 = val2.get_lock()
879 obj2 = val2.get_obj()
880
881 lock = self.Lock()
882 val3 = self.Value('i', 5, lock=lock)
883 lock3 = val3.get_lock()
884 obj3 = val3.get_obj()
885 self.assertEqual(lock, lock3)
886
Jesse Nollerb0516a62009-01-18 03:11:38 +0000887 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000888 self.assertFalse(hasattr(arr4, 'get_lock'))
889 self.assertFalse(hasattr(arr4, 'get_obj'))
890
Jesse Nollerb0516a62009-01-18 03:11:38 +0000891 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
892
893 arr5 = self.RawValue('i', 5)
894 self.assertFalse(hasattr(arr5, 'get_lock'))
895 self.assertFalse(hasattr(arr5, 'get_obj'))
896
Benjamin Petersone711caf2008-06-11 16:44:04 +0000897
898class _TestArray(BaseTestCase):
899
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000900 ALLOWED_TYPES = ('processes',)
901
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000902 @classmethod
903 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000904 for i in range(1, len(seq)):
905 seq[i] += seq[i-1]
906
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000907 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000908 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000909 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
910 if raw:
911 arr = self.RawArray('i', seq)
912 else:
913 arr = self.Array('i', seq)
914
915 self.assertEqual(len(arr), len(seq))
916 self.assertEqual(arr[3], seq[3])
917 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
918
919 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
920
921 self.assertEqual(list(arr[:]), seq)
922
923 self.f(seq)
924
925 p = self.Process(target=self.f, args=(arr,))
926 p.start()
927 p.join()
928
929 self.assertEqual(list(arr[:]), seq)
930
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000931 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000932 def test_rawarray(self):
933 self.test_array(raw=True)
934
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000935 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000936 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000937 arr1 = self.Array('i', list(range(10)))
938 lock1 = arr1.get_lock()
939 obj1 = arr1.get_obj()
940
941 arr2 = self.Array('i', list(range(10)), lock=None)
942 lock2 = arr2.get_lock()
943 obj2 = arr2.get_obj()
944
945 lock = self.Lock()
946 arr3 = self.Array('i', list(range(10)), lock=lock)
947 lock3 = arr3.get_lock()
948 obj3 = arr3.get_obj()
949 self.assertEqual(lock, lock3)
950
Jesse Nollerb0516a62009-01-18 03:11:38 +0000951 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000952 self.assertFalse(hasattr(arr4, 'get_lock'))
953 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000954 self.assertRaises(AttributeError,
955 self.Array, 'i', range(10), lock='notalock')
956
957 arr5 = self.RawArray('i', range(10))
958 self.assertFalse(hasattr(arr5, 'get_lock'))
959 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000960
961#
962#
963#
964
965class _TestContainers(BaseTestCase):
966
967 ALLOWED_TYPES = ('manager',)
968
969 def test_list(self):
970 a = self.list(list(range(10)))
971 self.assertEqual(a[:], list(range(10)))
972
973 b = self.list()
974 self.assertEqual(b[:], [])
975
976 b.extend(list(range(5)))
977 self.assertEqual(b[:], list(range(5)))
978
979 self.assertEqual(b[2], 2)
980 self.assertEqual(b[2:10], [2,3,4])
981
982 b *= 2
983 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
984
985 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
986
987 self.assertEqual(a[:], list(range(10)))
988
989 d = [a, b]
990 e = self.list(d)
991 self.assertEqual(
992 e[:],
993 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
994 )
995
996 f = self.list([a])
997 a.append('hello')
998 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
999
1000 def test_dict(self):
1001 d = self.dict()
1002 indices = list(range(65, 70))
1003 for i in indices:
1004 d[i] = chr(i)
1005 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1006 self.assertEqual(sorted(d.keys()), indices)
1007 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1008 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1009
1010 def test_namespace(self):
1011 n = self.Namespace()
1012 n.name = 'Bob'
1013 n.job = 'Builder'
1014 n._hidden = 'hidden'
1015 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1016 del n.job
1017 self.assertEqual(str(n), "Namespace(name='Bob')")
1018 self.assertTrue(hasattr(n, 'name'))
1019 self.assertTrue(not hasattr(n, 'job'))
1020
1021#
1022#
1023#
1024
1025def sqr(x, wait=0.0):
1026 time.sleep(wait)
1027 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001028
Benjamin Petersone711caf2008-06-11 16:44:04 +00001029class _TestPool(BaseTestCase):
1030
1031 def test_apply(self):
1032 papply = self.pool.apply
1033 self.assertEqual(papply(sqr, (5,)), sqr(5))
1034 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1035
1036 def test_map(self):
1037 pmap = self.pool.map
1038 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1039 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1040 list(map(sqr, list(range(100)))))
1041
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001042 def test_map_chunksize(self):
1043 try:
1044 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1045 except multiprocessing.TimeoutError:
1046 self.fail("pool.map_async with chunksize stalled on null list")
1047
Benjamin Petersone711caf2008-06-11 16:44:04 +00001048 def test_async(self):
1049 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1050 get = TimingWrapper(res.get)
1051 self.assertEqual(get(), 49)
1052 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1053
1054 def test_async_timeout(self):
1055 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1056 get = TimingWrapper(res.get)
1057 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1058 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1059
1060 def test_imap(self):
1061 it = self.pool.imap(sqr, list(range(10)))
1062 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1063
1064 it = self.pool.imap(sqr, list(range(10)))
1065 for i in range(10):
1066 self.assertEqual(next(it), i*i)
1067 self.assertRaises(StopIteration, it.__next__)
1068
1069 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1070 for i in range(1000):
1071 self.assertEqual(next(it), i*i)
1072 self.assertRaises(StopIteration, it.__next__)
1073
1074 def test_imap_unordered(self):
1075 it = self.pool.imap_unordered(sqr, list(range(1000)))
1076 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1077
1078 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1079 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1080
1081 def test_make_pool(self):
1082 p = multiprocessing.Pool(3)
1083 self.assertEqual(3, len(p._pool))
1084 p.close()
1085 p.join()
1086
1087 def test_terminate(self):
1088 if self.TYPE == 'manager':
1089 # On Unix a forked process increfs each shared object to
1090 # which its parent process held a reference. If the
1091 # forked process gets terminated then there is likely to
1092 # be a reference leak. So to prevent
1093 # _TestZZZNumberOfObjects from failing we skip this test
1094 # when using a manager.
1095 return
1096
1097 result = self.pool.map_async(
1098 time.sleep, [0.1 for i in range(10000)], chunksize=1
1099 )
1100 self.pool.terminate()
1101 join = TimingWrapper(self.pool.join)
1102 join()
1103 self.assertTrue(join.elapsed < 0.2)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001104
Ask Solem2afcbf22010-11-09 20:55:52 +00001105def raising():
1106 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001107
Ask Solem2afcbf22010-11-09 20:55:52 +00001108def unpickleable_result():
1109 return lambda: 42
1110
1111class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001112 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001113
1114 def test_async_error_callback(self):
1115 p = multiprocessing.Pool(2)
1116
1117 scratchpad = [None]
1118 def errback(exc):
1119 scratchpad[0] = exc
1120
1121 res = p.apply_async(raising, error_callback=errback)
1122 self.assertRaises(KeyError, res.get)
1123 self.assertTrue(scratchpad[0])
1124 self.assertIsInstance(scratchpad[0], KeyError)
1125
1126 p.close()
1127 p.join()
1128
1129 def test_unpickleable_result(self):
1130 from multiprocessing.pool import MaybeEncodingError
1131 p = multiprocessing.Pool(2)
1132
1133 # Make sure we don't lose pool processes because of encoding errors.
1134 for iteration in range(20):
1135
1136 scratchpad = [None]
1137 def errback(exc):
1138 scratchpad[0] = exc
1139
1140 res = p.apply_async(unpickleable_result, error_callback=errback)
1141 self.assertRaises(MaybeEncodingError, res.get)
1142 wrapped = scratchpad[0]
1143 self.assertTrue(wrapped)
1144 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1145 self.assertIsNotNone(wrapped.exc)
1146 self.assertIsNotNone(wrapped.value)
1147
1148 p.close()
1149 p.join()
1150
1151class _TestPoolWorkerLifetime(BaseTestCase):
1152 ALLOWED_TYPES = ('processes', )
1153
Jesse Noller1f0b6582010-01-27 03:36:01 +00001154 def test_pool_worker_lifetime(self):
1155 p = multiprocessing.Pool(3, maxtasksperchild=10)
1156 self.assertEqual(3, len(p._pool))
1157 origworkerpids = [w.pid for w in p._pool]
1158 # Run many tasks so each worker gets replaced (hopefully)
1159 results = []
1160 for i in range(100):
1161 results.append(p.apply_async(sqr, (i, )))
1162 # Fetch the results and verify we got the right answers,
1163 # also ensuring all the tasks have completed.
1164 for (j, res) in enumerate(results):
1165 self.assertEqual(res.get(), sqr(j))
1166 # Refill the pool
1167 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001168 # Wait until all workers are alive
1169 countdown = 5
1170 while countdown and not all(w.is_alive() for w in p._pool):
1171 countdown -= 1
1172 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001173 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001174 # All pids should be assigned. See issue #7805.
1175 self.assertNotIn(None, origworkerpids)
1176 self.assertNotIn(None, finalworkerpids)
1177 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001178 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1179 p.close()
1180 p.join()
1181
Benjamin Petersone711caf2008-06-11 16:44:04 +00001182#
1183# Test that manager has expected number of shared objects left
1184#
1185
1186class _TestZZZNumberOfObjects(BaseTestCase):
1187 # Because test cases are sorted alphabetically, this one will get
1188 # run after all the other tests for the manager. It tests that
1189 # there have been no "reference leaks" for the manager's shared
1190 # objects. Note the comment in _TestPool.test_terminate().
1191 ALLOWED_TYPES = ('manager',)
1192
1193 def test_number_of_objects(self):
1194 EXPECTED_NUMBER = 1 # the pool object is still alive
1195 multiprocessing.active_children() # discard dead process objs
1196 gc.collect() # do garbage collection
1197 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001198 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001199 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001200 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001201 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001202
1203 self.assertEqual(refs, EXPECTED_NUMBER)
1204
1205#
1206# Test of creating a customized manager class
1207#
1208
1209from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1210
1211class FooBar(object):
1212 def f(self):
1213 return 'f()'
1214 def g(self):
1215 raise ValueError
1216 def _h(self):
1217 return '_h()'
1218
1219def baz():
1220 for i in range(10):
1221 yield i*i
1222
1223class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001224 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001225 def __iter__(self):
1226 return self
1227 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001228 return self._callmethod('__next__')
1229
1230class MyManager(BaseManager):
1231 pass
1232
1233MyManager.register('Foo', callable=FooBar)
1234MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1235MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1236
1237
1238class _TestMyManager(BaseTestCase):
1239
1240 ALLOWED_TYPES = ('manager',)
1241
1242 def test_mymanager(self):
1243 manager = MyManager()
1244 manager.start()
1245
1246 foo = manager.Foo()
1247 bar = manager.Bar()
1248 baz = manager.baz()
1249
1250 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1251 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1252
1253 self.assertEqual(foo_methods, ['f', 'g'])
1254 self.assertEqual(bar_methods, ['f', '_h'])
1255
1256 self.assertEqual(foo.f(), 'f()')
1257 self.assertRaises(ValueError, foo.g)
1258 self.assertEqual(foo._callmethod('f'), 'f()')
1259 self.assertRaises(RemoteError, foo._callmethod, '_h')
1260
1261 self.assertEqual(bar.f(), 'f()')
1262 self.assertEqual(bar._h(), '_h()')
1263 self.assertEqual(bar._callmethod('f'), 'f()')
1264 self.assertEqual(bar._callmethod('_h'), '_h()')
1265
1266 self.assertEqual(list(baz), [i*i for i in range(10)])
1267
1268 manager.shutdown()
1269
1270#
1271# Test of connecting to a remote server and using xmlrpclib for serialization
1272#
1273
1274_queue = pyqueue.Queue()
1275def get_queue():
1276 return _queue
1277
1278class QueueManager(BaseManager):
1279 '''manager class used by server process'''
1280QueueManager.register('get_queue', callable=get_queue)
1281
1282class QueueManager2(BaseManager):
1283 '''manager class which specifies the same interface as QueueManager'''
1284QueueManager2.register('get_queue')
1285
1286
1287SERIALIZER = 'xmlrpclib'
1288
1289class _TestRemoteManager(BaseTestCase):
1290
1291 ALLOWED_TYPES = ('manager',)
1292
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001293 @classmethod
1294 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001295 manager = QueueManager2(
1296 address=address, authkey=authkey, serializer=SERIALIZER
1297 )
1298 manager.connect()
1299 queue = manager.get_queue()
1300 queue.put(('hello world', None, True, 2.25))
1301
1302 def test_remote(self):
1303 authkey = os.urandom(32)
1304
1305 manager = QueueManager(
1306 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1307 )
1308 manager.start()
1309
1310 p = self.Process(target=self._putter, args=(manager.address, authkey))
1311 p.start()
1312
1313 manager2 = QueueManager2(
1314 address=manager.address, authkey=authkey, serializer=SERIALIZER
1315 )
1316 manager2.connect()
1317 queue = manager2.get_queue()
1318
1319 # Note that xmlrpclib will deserialize object as a list not a tuple
1320 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1321
1322 # Because we are using xmlrpclib for serialization instead of
1323 # pickle this will cause a serialization error.
1324 self.assertRaises(Exception, queue.put, time.sleep)
1325
1326 # Make queue finalizer run before the server is stopped
1327 del queue
1328 manager.shutdown()
1329
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001330class _TestManagerRestart(BaseTestCase):
1331
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001332 @classmethod
1333 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001334 manager = QueueManager(
1335 address=address, authkey=authkey, serializer=SERIALIZER)
1336 manager.connect()
1337 queue = manager.get_queue()
1338 queue.put('hello world')
1339
1340 def test_rapid_restart(self):
1341 authkey = os.urandom(32)
1342 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001343 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001344 srvr = manager.get_server()
1345 addr = srvr.address
1346 # Close the connection.Listener socket which gets opened as a part
1347 # of manager.get_server(). It's not needed for the test.
1348 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001349 manager.start()
1350
1351 p = self.Process(target=self._putter, args=(manager.address, authkey))
1352 p.start()
1353 queue = manager.get_queue()
1354 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001355 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001356 manager.shutdown()
1357 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001358 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001359 manager.start()
Jesse Noller35d1f002009-03-30 22:59:27 +00001360 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001361
Benjamin Petersone711caf2008-06-11 16:44:04 +00001362#
1363#
1364#
1365
1366SENTINEL = latin('')
1367
1368class _TestConnection(BaseTestCase):
1369
1370 ALLOWED_TYPES = ('processes', 'threads')
1371
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001372 @classmethod
1373 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001374 for msg in iter(conn.recv_bytes, SENTINEL):
1375 conn.send_bytes(msg)
1376 conn.close()
1377
1378 def test_connection(self):
1379 conn, child_conn = self.Pipe()
1380
1381 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001382 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001383 p.start()
1384
1385 seq = [1, 2.25, None]
1386 msg = latin('hello world')
1387 longmsg = msg * 10
1388 arr = array.array('i', list(range(4)))
1389
1390 if self.TYPE == 'processes':
1391 self.assertEqual(type(conn.fileno()), int)
1392
1393 self.assertEqual(conn.send(seq), None)
1394 self.assertEqual(conn.recv(), seq)
1395
1396 self.assertEqual(conn.send_bytes(msg), None)
1397 self.assertEqual(conn.recv_bytes(), msg)
1398
1399 if self.TYPE == 'processes':
1400 buffer = array.array('i', [0]*10)
1401 expected = list(arr) + [0] * (10 - len(arr))
1402 self.assertEqual(conn.send_bytes(arr), None)
1403 self.assertEqual(conn.recv_bytes_into(buffer),
1404 len(arr) * buffer.itemsize)
1405 self.assertEqual(list(buffer), expected)
1406
1407 buffer = array.array('i', [0]*10)
1408 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1409 self.assertEqual(conn.send_bytes(arr), None)
1410 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1411 len(arr) * buffer.itemsize)
1412 self.assertEqual(list(buffer), expected)
1413
1414 buffer = bytearray(latin(' ' * 40))
1415 self.assertEqual(conn.send_bytes(longmsg), None)
1416 try:
1417 res = conn.recv_bytes_into(buffer)
1418 except multiprocessing.BufferTooShort as e:
1419 self.assertEqual(e.args, (longmsg,))
1420 else:
1421 self.fail('expected BufferTooShort, got %s' % res)
1422
1423 poll = TimingWrapper(conn.poll)
1424
1425 self.assertEqual(poll(), False)
1426 self.assertTimingAlmostEqual(poll.elapsed, 0)
1427
1428 self.assertEqual(poll(TIMEOUT1), False)
1429 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1430
1431 conn.send(None)
1432
1433 self.assertEqual(poll(TIMEOUT1), True)
1434 self.assertTimingAlmostEqual(poll.elapsed, 0)
1435
1436 self.assertEqual(conn.recv(), None)
1437
1438 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1439 conn.send_bytes(really_big_msg)
1440 self.assertEqual(conn.recv_bytes(), really_big_msg)
1441
1442 conn.send_bytes(SENTINEL) # tell child to quit
1443 child_conn.close()
1444
1445 if self.TYPE == 'processes':
1446 self.assertEqual(conn.readable, True)
1447 self.assertEqual(conn.writable, True)
1448 self.assertRaises(EOFError, conn.recv)
1449 self.assertRaises(EOFError, conn.recv_bytes)
1450
1451 p.join()
1452
1453 def test_duplex_false(self):
1454 reader, writer = self.Pipe(duplex=False)
1455 self.assertEqual(writer.send(1), None)
1456 self.assertEqual(reader.recv(), 1)
1457 if self.TYPE == 'processes':
1458 self.assertEqual(reader.readable, True)
1459 self.assertEqual(reader.writable, False)
1460 self.assertEqual(writer.readable, False)
1461 self.assertEqual(writer.writable, True)
1462 self.assertRaises(IOError, reader.send, 2)
1463 self.assertRaises(IOError, writer.recv)
1464 self.assertRaises(IOError, writer.poll)
1465
1466 def test_spawn_close(self):
1467 # We test that a pipe connection can be closed by parent
1468 # process immediately after child is spawned. On Windows this
1469 # would have sometimes failed on old versions because
1470 # child_conn would be closed before the child got a chance to
1471 # duplicate it.
1472 conn, child_conn = self.Pipe()
1473
1474 p = self.Process(target=self._echo, args=(child_conn,))
1475 p.start()
1476 child_conn.close() # this might complete before child initializes
1477
1478 msg = latin('hello')
1479 conn.send_bytes(msg)
1480 self.assertEqual(conn.recv_bytes(), msg)
1481
1482 conn.send_bytes(SENTINEL)
1483 conn.close()
1484 p.join()
1485
1486 def test_sendbytes(self):
1487 if self.TYPE != 'processes':
1488 return
1489
1490 msg = latin('abcdefghijklmnopqrstuvwxyz')
1491 a, b = self.Pipe()
1492
1493 a.send_bytes(msg)
1494 self.assertEqual(b.recv_bytes(), msg)
1495
1496 a.send_bytes(msg, 5)
1497 self.assertEqual(b.recv_bytes(), msg[5:])
1498
1499 a.send_bytes(msg, 7, 8)
1500 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1501
1502 a.send_bytes(msg, 26)
1503 self.assertEqual(b.recv_bytes(), latin(''))
1504
1505 a.send_bytes(msg, 26, 0)
1506 self.assertEqual(b.recv_bytes(), latin(''))
1507
1508 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1509
1510 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1511
1512 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1513
1514 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1515
1516 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1517
Benjamin Petersone711caf2008-06-11 16:44:04 +00001518class _TestListenerClient(BaseTestCase):
1519
1520 ALLOWED_TYPES = ('processes', 'threads')
1521
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001522 @classmethod
1523 def _test(cls, address):
1524 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001525 conn.send('hello')
1526 conn.close()
1527
1528 def test_listener_client(self):
1529 for family in self.connection.families:
1530 l = self.connection.Listener(family=family)
1531 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001532 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001533 p.start()
1534 conn = l.accept()
1535 self.assertEqual(conn.recv(), 'hello')
1536 p.join()
1537 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001538#
1539# Test of sending connection and socket objects between processes
1540#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001541"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001542class _TestPicklingConnections(BaseTestCase):
1543
1544 ALLOWED_TYPES = ('processes',)
1545
1546 def _listener(self, conn, families):
1547 for fam in families:
1548 l = self.connection.Listener(family=fam)
1549 conn.send(l.address)
1550 new_conn = l.accept()
1551 conn.send(new_conn)
1552
1553 if self.TYPE == 'processes':
1554 l = socket.socket()
1555 l.bind(('localhost', 0))
1556 conn.send(l.getsockname())
1557 l.listen(1)
1558 new_conn, addr = l.accept()
1559 conn.send(new_conn)
1560
1561 conn.recv()
1562
1563 def _remote(self, conn):
1564 for (address, msg) in iter(conn.recv, None):
1565 client = self.connection.Client(address)
1566 client.send(msg.upper())
1567 client.close()
1568
1569 if self.TYPE == 'processes':
1570 address, msg = conn.recv()
1571 client = socket.socket()
1572 client.connect(address)
1573 client.sendall(msg.upper())
1574 client.close()
1575
1576 conn.close()
1577
1578 def test_pickling(self):
1579 try:
1580 multiprocessing.allow_connection_pickling()
1581 except ImportError:
1582 return
1583
1584 families = self.connection.families
1585
1586 lconn, lconn0 = self.Pipe()
1587 lp = self.Process(target=self._listener, args=(lconn0, families))
1588 lp.start()
1589 lconn0.close()
1590
1591 rconn, rconn0 = self.Pipe()
1592 rp = self.Process(target=self._remote, args=(rconn0,))
1593 rp.start()
1594 rconn0.close()
1595
1596 for fam in families:
1597 msg = ('This connection uses family %s' % fam).encode('ascii')
1598 address = lconn.recv()
1599 rconn.send((address, msg))
1600 new_conn = lconn.recv()
1601 self.assertEqual(new_conn.recv(), msg.upper())
1602
1603 rconn.send(None)
1604
1605 if self.TYPE == 'processes':
1606 msg = latin('This connection uses a normal socket')
1607 address = lconn.recv()
1608 rconn.send((address, msg))
1609 if hasattr(socket, 'fromfd'):
1610 new_conn = lconn.recv()
1611 self.assertEqual(new_conn.recv(100), msg.upper())
1612 else:
1613 # XXX On Windows with Py2.6 need to backport fromfd()
1614 discard = lconn.recv_bytes()
1615
1616 lconn.send(None)
1617
1618 rconn.close()
1619 lconn.close()
1620
1621 lp.join()
1622 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001623"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001624#
1625#
1626#
1627
1628class _TestHeap(BaseTestCase):
1629
1630 ALLOWED_TYPES = ('processes',)
1631
1632 def test_heap(self):
1633 iterations = 5000
1634 maxblocks = 50
1635 blocks = []
1636
1637 # create and destroy lots of blocks of different sizes
1638 for i in range(iterations):
1639 size = int(random.lognormvariate(0, 1) * 1000)
1640 b = multiprocessing.heap.BufferWrapper(size)
1641 blocks.append(b)
1642 if len(blocks) > maxblocks:
1643 i = random.randrange(maxblocks)
1644 del blocks[i]
1645
1646 # get the heap object
1647 heap = multiprocessing.heap.BufferWrapper._heap
1648
1649 # verify the state of the heap
1650 all = []
1651 occupied = 0
1652 for L in list(heap._len_to_seq.values()):
1653 for arena, start, stop in L:
1654 all.append((heap._arenas.index(arena), start, stop,
1655 stop-start, 'free'))
1656 for arena, start, stop in heap._allocated_blocks:
1657 all.append((heap._arenas.index(arena), start, stop,
1658 stop-start, 'occupied'))
1659 occupied += (stop-start)
1660
1661 all.sort()
1662
1663 for i in range(len(all)-1):
1664 (arena, start, stop) = all[i][:3]
1665 (narena, nstart, nstop) = all[i+1][:3]
1666 self.assertTrue((arena != narena and nstart == 0) or
1667 (stop == nstart))
1668
1669#
1670#
1671#
1672
Benjamin Petersone711caf2008-06-11 16:44:04 +00001673class _Foo(Structure):
1674 _fields_ = [
1675 ('x', c_int),
1676 ('y', c_double)
1677 ]
1678
1679class _TestSharedCTypes(BaseTestCase):
1680
1681 ALLOWED_TYPES = ('processes',)
1682
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001683 def setUp(self):
1684 if not HAS_SHAREDCTYPES:
1685 self.skipTest("requires multiprocessing.sharedctypes")
1686
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001687 @classmethod
1688 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001689 x.value *= 2
1690 y.value *= 2
1691 foo.x *= 2
1692 foo.y *= 2
1693 string.value *= 2
1694 for i in range(len(arr)):
1695 arr[i] *= 2
1696
1697 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001698 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001699 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001700 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001701 arr = self.Array('d', list(range(10)), lock=lock)
1702 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001703 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001704
1705 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1706 p.start()
1707 p.join()
1708
1709 self.assertEqual(x.value, 14)
1710 self.assertAlmostEqual(y.value, 2.0/3.0)
1711 self.assertEqual(foo.x, 6)
1712 self.assertAlmostEqual(foo.y, 4.0)
1713 for i in range(10):
1714 self.assertAlmostEqual(arr[i], i*2)
1715 self.assertEqual(string.value, latin('hellohello'))
1716
1717 def test_synchronize(self):
1718 self.test_sharedctypes(lock=True)
1719
1720 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001721 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001722 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001723 foo.x = 0
1724 foo.y = 0
1725 self.assertEqual(bar.x, 2)
1726 self.assertAlmostEqual(bar.y, 5.0)
1727
1728#
1729#
1730#
1731
1732class _TestFinalize(BaseTestCase):
1733
1734 ALLOWED_TYPES = ('processes',)
1735
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001736 @classmethod
1737 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001738 class Foo(object):
1739 pass
1740
1741 a = Foo()
1742 util.Finalize(a, conn.send, args=('a',))
1743 del a # triggers callback for a
1744
1745 b = Foo()
1746 close_b = util.Finalize(b, conn.send, args=('b',))
1747 close_b() # triggers callback for b
1748 close_b() # does nothing because callback has already been called
1749 del b # does nothing because callback has already been called
1750
1751 c = Foo()
1752 util.Finalize(c, conn.send, args=('c',))
1753
1754 d10 = Foo()
1755 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1756
1757 d01 = Foo()
1758 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1759 d02 = Foo()
1760 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1761 d03 = Foo()
1762 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1763
1764 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1765
1766 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1767
1768 # call mutliprocessing's cleanup function then exit process without
1769 # garbage collecting locals
1770 util._exit_function()
1771 conn.close()
1772 os._exit(0)
1773
1774 def test_finalize(self):
1775 conn, child_conn = self.Pipe()
1776
1777 p = self.Process(target=self._test_finalize, args=(child_conn,))
1778 p.start()
1779 p.join()
1780
1781 result = [obj for obj in iter(conn.recv, 'STOP')]
1782 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1783
1784#
1785# Test that from ... import * works for each module
1786#
1787
1788class _TestImportStar(BaseTestCase):
1789
1790 ALLOWED_TYPES = ('processes',)
1791
1792 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001793 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001794 'multiprocessing', 'multiprocessing.connection',
1795 'multiprocessing.heap', 'multiprocessing.managers',
1796 'multiprocessing.pool', 'multiprocessing.process',
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001797 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001798 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001799 ]
1800
1801 if c_int is not None:
1802 # This module requires _ctypes
1803 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001804
1805 for name in modules:
1806 __import__(name)
1807 mod = sys.modules[name]
1808
1809 for attr in getattr(mod, '__all__', ()):
1810 self.assertTrue(
1811 hasattr(mod, attr),
1812 '%r does not have attribute %r' % (mod, attr)
1813 )
1814
1815#
1816# Quick test that logging works -- does not test logging output
1817#
1818
1819class _TestLogging(BaseTestCase):
1820
1821 ALLOWED_TYPES = ('processes',)
1822
1823 def test_enable_logging(self):
1824 logger = multiprocessing.get_logger()
1825 logger.setLevel(util.SUBWARNING)
1826 self.assertTrue(logger is not None)
1827 logger.debug('this will not be printed')
1828 logger.info('nor will this')
1829 logger.setLevel(LOG_LEVEL)
1830
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001831 @classmethod
1832 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001833 logger = multiprocessing.get_logger()
1834 conn.send(logger.getEffectiveLevel())
1835
1836 def test_level(self):
1837 LEVEL1 = 32
1838 LEVEL2 = 37
1839
1840 logger = multiprocessing.get_logger()
1841 root_logger = logging.getLogger()
1842 root_level = root_logger.level
1843
1844 reader, writer = multiprocessing.Pipe(duplex=False)
1845
1846 logger.setLevel(LEVEL1)
1847 self.Process(target=self._test_level, args=(writer,)).start()
1848 self.assertEqual(LEVEL1, reader.recv())
1849
1850 logger.setLevel(logging.NOTSET)
1851 root_logger.setLevel(LEVEL2)
1852 self.Process(target=self._test_level, args=(writer,)).start()
1853 self.assertEqual(LEVEL2, reader.recv())
1854
1855 root_logger.setLevel(root_level)
1856 logger.setLevel(level=LOG_LEVEL)
1857
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001858
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00001859# class _TestLoggingProcessName(BaseTestCase):
1860#
1861# def handle(self, record):
1862# assert record.processName == multiprocessing.current_process().name
1863# self.__handled = True
1864#
1865# def test_logging(self):
1866# handler = logging.Handler()
1867# handler.handle = self.handle
1868# self.__handled = False
1869# # Bypass getLogger() and side-effects
1870# logger = logging.getLoggerClass()(
1871# 'multiprocessing.test.TestLoggingProcessName')
1872# logger.addHandler(handler)
1873# logger.propagate = False
1874#
1875# logger.warn('foo')
1876# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001877
Benjamin Petersone711caf2008-06-11 16:44:04 +00001878#
Jesse Noller6214edd2009-01-19 16:23:53 +00001879# Test to verify handle verification, see issue 3321
1880#
1881
1882class TestInvalidHandle(unittest.TestCase):
1883
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001884 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001885 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00001886 conn = _multiprocessing.Connection(44977608)
1887 self.assertRaises(IOError, conn.poll)
1888 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001889
Jesse Noller6214edd2009-01-19 16:23:53 +00001890#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001891# Functions used to create test cases from the base ones in this module
1892#
1893
1894def get_attributes(Source, names):
1895 d = {}
1896 for name in names:
1897 obj = getattr(Source, name)
1898 if type(obj) == type(get_attributes):
1899 obj = staticmethod(obj)
1900 d[name] = obj
1901 return d
1902
1903def create_test_cases(Mixin, type):
1904 result = {}
1905 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001906 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001907
1908 for name in list(glob.keys()):
1909 if name.startswith('_Test'):
1910 base = glob[name]
1911 if type in base.ALLOWED_TYPES:
1912 newname = 'With' + Type + name[1:]
1913 class Temp(base, unittest.TestCase, Mixin):
1914 pass
1915 result[newname] = Temp
1916 Temp.__name__ = newname
1917 Temp.__module__ = Mixin.__module__
1918 return result
1919
1920#
1921# Create test cases
1922#
1923
1924class ProcessesMixin(object):
1925 TYPE = 'processes'
1926 Process = multiprocessing.Process
1927 locals().update(get_attributes(multiprocessing, (
1928 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1929 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1930 'RawArray', 'current_process', 'active_children', 'Pipe',
1931 'connection', 'JoinableQueue'
1932 )))
1933
1934testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1935globals().update(testcases_processes)
1936
1937
1938class ManagerMixin(object):
1939 TYPE = 'manager'
1940 Process = multiprocessing.Process
1941 manager = object.__new__(multiprocessing.managers.SyncManager)
1942 locals().update(get_attributes(manager, (
1943 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1944 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1945 'Namespace', 'JoinableQueue'
1946 )))
1947
1948testcases_manager = create_test_cases(ManagerMixin, type='manager')
1949globals().update(testcases_manager)
1950
1951
1952class ThreadsMixin(object):
1953 TYPE = 'threads'
1954 Process = multiprocessing.dummy.Process
1955 locals().update(get_attributes(multiprocessing.dummy, (
1956 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1957 'Condition', 'Event', 'Value', 'Array', 'current_process',
1958 'active_children', 'Pipe', 'connection', 'dict', 'list',
1959 'Namespace', 'JoinableQueue'
1960 )))
1961
1962testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1963globals().update(testcases_threads)
1964
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001965class OtherTest(unittest.TestCase):
1966 # TODO: add more tests for deliver/answer challenge.
1967 def test_deliver_challenge_auth_failure(self):
1968 class _FakeConnection(object):
1969 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001970 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001971 def send_bytes(self, data):
1972 pass
1973 self.assertRaises(multiprocessing.AuthenticationError,
1974 multiprocessing.connection.deliver_challenge,
1975 _FakeConnection(), b'abc')
1976
1977 def test_answer_challenge_auth_failure(self):
1978 class _FakeConnection(object):
1979 def __init__(self):
1980 self.count = 0
1981 def recv_bytes(self, size):
1982 self.count += 1
1983 if self.count == 1:
1984 return multiprocessing.connection.CHALLENGE
1985 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001986 return b'something bogus'
1987 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001988 def send_bytes(self, data):
1989 pass
1990 self.assertRaises(multiprocessing.AuthenticationError,
1991 multiprocessing.connection.answer_challenge,
1992 _FakeConnection(), b'abc')
1993
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00001994#
1995# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1996#
1997
1998def initializer(ns):
1999 ns.test += 1
2000
2001class TestInitializers(unittest.TestCase):
2002 def setUp(self):
2003 self.mgr = multiprocessing.Manager()
2004 self.ns = self.mgr.Namespace()
2005 self.ns.test = 0
2006
2007 def tearDown(self):
2008 self.mgr.shutdown()
2009
2010 def test_manager_initializer(self):
2011 m = multiprocessing.managers.SyncManager()
2012 self.assertRaises(TypeError, m.start, 1)
2013 m.start(initializer, (self.ns,))
2014 self.assertEqual(self.ns.test, 1)
2015 m.shutdown()
2016
2017 def test_pool_initializer(self):
2018 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2019 p = multiprocessing.Pool(1, initializer, (self.ns,))
2020 p.close()
2021 p.join()
2022 self.assertEqual(self.ns.test, 1)
2023
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002024#
2025# Issue 5155, 5313, 5331: Test process in processes
2026# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2027#
2028
2029def _ThisSubProcess(q):
2030 try:
2031 item = q.get(block=False)
2032 except pyqueue.Empty:
2033 pass
2034
2035def _TestProcess(q):
2036 queue = multiprocessing.Queue()
2037 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
2038 subProc.start()
2039 subProc.join()
2040
2041def _afunc(x):
2042 return x*x
2043
2044def pool_in_process():
2045 pool = multiprocessing.Pool(processes=4)
2046 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2047
2048class _file_like(object):
2049 def __init__(self, delegate):
2050 self._delegate = delegate
2051 self._pid = None
2052
2053 @property
2054 def cache(self):
2055 pid = os.getpid()
2056 # There are no race conditions since fork keeps only the running thread
2057 if pid != self._pid:
2058 self._pid = pid
2059 self._cache = []
2060 return self._cache
2061
2062 def write(self, data):
2063 self.cache.append(data)
2064
2065 def flush(self):
2066 self._delegate.write(''.join(self.cache))
2067 self._cache = []
2068
2069class TestStdinBadfiledescriptor(unittest.TestCase):
2070
2071 def test_queue_in_process(self):
2072 queue = multiprocessing.Queue()
2073 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2074 proc.start()
2075 proc.join()
2076
2077 def test_pool_in_process(self):
2078 p = multiprocessing.Process(target=pool_in_process)
2079 p.start()
2080 p.join()
2081
2082 def test_flushing(self):
2083 sio = io.StringIO()
2084 flike = _file_like(sio)
2085 flike.write('foo')
2086 proc = multiprocessing.Process(target=lambda: flike.flush())
2087 flike.flush()
2088 assert sio.getvalue() == 'foo'
2089
2090testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2091 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002092
Benjamin Petersone711caf2008-06-11 16:44:04 +00002093#
2094#
2095#
2096
2097def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002098 if sys.platform.startswith("linux"):
2099 try:
2100 lock = multiprocessing.RLock()
2101 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002102 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002103
Benjamin Petersone711caf2008-06-11 16:44:04 +00002104 if run is None:
2105 from test.support import run_unittest as run
2106
2107 util.get_temp_dir() # creates temp directory for use by all processes
2108
2109 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2110
Benjamin Peterson41181742008-07-02 20:22:54 +00002111 ProcessesMixin.pool = multiprocessing.Pool(4)
2112 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2113 ManagerMixin.manager.__init__()
2114 ManagerMixin.manager.start()
2115 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002116
2117 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002118 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2119 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002120 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2121 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002122 )
2123
2124 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2125 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2126 run(suite)
2127
Benjamin Peterson41181742008-07-02 20:22:54 +00002128 ThreadsMixin.pool.terminate()
2129 ProcessesMixin.pool.terminate()
2130 ManagerMixin.pool.terminate()
2131 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002132
Benjamin Peterson41181742008-07-02 20:22:54 +00002133 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002134
2135def main():
2136 test_main(unittest.TextTestRunner(verbosity=2).run)
2137
2138if __name__ == '__main__':
2139 main()