blob: 6d12d12cd57cb78b0e7bae47879ce7000d1c18b8 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import queue as pyqueue
10import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000011import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
15import signal
16import array
17import copy
18import socket
19import random
20import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000021import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000022
Benjamin Petersone5384b02008-10-04 22:00:42 +000023
R. David Murraya21e4ca2009-03-31 23:16:50 +000024# Skip tests if _multiprocessing wasn't built.
25_multiprocessing = test.support.import_module('_multiprocessing')
26# Skip tests if sem_open implementation is broken.
27test.support.import_module('multiprocessing.synchronize')
Benjamin Petersone5384b02008-10-04 22:00:42 +000028
Benjamin Petersone711caf2008-06-11 16:44:04 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000033import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000034
35from multiprocessing import util
36
37#
38#
39#
40
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000041def latin(s):
42 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000043
Benjamin Petersone711caf2008-06-11 16:44:04 +000044#
45# Constants
46#
47
48LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000049#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000050
51DELTA = 0.1
52CHECK_TIMINGS = False # making true makes tests take a lot longer
53 # and can sometimes cause some non-serious
54 # failures because some calls block a bit
55 # longer than expected
56if CHECK_TIMINGS:
57 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
58else:
59 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
60
61HAVE_GETVALUE = not getattr(_multiprocessing,
62 'HAVE_BROKEN_SEM_GETVALUE', False)
63
Jesse Noller6214edd2009-01-19 16:23:53 +000064WIN32 = (sys.platform == "win32")
65
Benjamin Petersone711caf2008-06-11 16:44:04 +000066#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000067# Some tests require ctypes
68#
69
70try:
71 from ctypes import Structure, Value, copy, c_int, c_double
72except ImportError:
73 Structure = object
74 c_int = c_double = None
75
76#
Benjamin Petersone711caf2008-06-11 16:44:04 +000077# Creates a wrapper for a function which records the time it takes to finish
78#
79
80class TimingWrapper(object):
81
82 def __init__(self, func):
83 self.func = func
84 self.elapsed = None
85
86 def __call__(self, *args, **kwds):
87 t = time.time()
88 try:
89 return self.func(*args, **kwds)
90 finally:
91 self.elapsed = time.time() - t
92
93#
94# Base class for test cases
95#
96
97class BaseTestCase(object):
98
99 ALLOWED_TYPES = ('processes', 'manager', 'threads')
100
101 def assertTimingAlmostEqual(self, a, b):
102 if CHECK_TIMINGS:
103 self.assertAlmostEqual(a, b, 1)
104
105 def assertReturnsIfImplemented(self, value, func, *args):
106 try:
107 res = func(*args)
108 except NotImplementedError:
109 pass
110 else:
111 return self.assertEqual(value, res)
112
113#
114# Return the value of a semaphore
115#
116
117def get_value(self):
118 try:
119 return self.get_value()
120 except AttributeError:
121 try:
122 return self._Semaphore__value
123 except AttributeError:
124 try:
125 return self._value
126 except AttributeError:
127 raise NotImplementedError
128
129#
130# Testcases
131#
132
133class _TestProcess(BaseTestCase):
134
135 ALLOWED_TYPES = ('processes', 'threads')
136
137 def test_current(self):
138 if self.TYPE == 'threads':
139 return
140
141 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000142 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000143
144 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000145 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000146 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000147 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000148 self.assertEqual(current.ident, os.getpid())
149 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000150
151 def _test(self, q, *args, **kwds):
152 current = self.current_process()
153 q.put(args)
154 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000155 q.put(current.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000156 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000157 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000158 q.put(current.pid)
159
160 def test_process(self):
161 q = self.Queue(1)
162 e = self.Event()
163 args = (q, 1, 2)
164 kwargs = {'hello':23, 'bye':2.54}
165 name = 'SomeProcess'
166 p = self.Process(
167 target=self._test, args=args, kwargs=kwargs, name=name
168 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000169 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000170 current = self.current_process()
171
172 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000173 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000174 self.assertEquals(p.is_alive(), False)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000175 self.assertEquals(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000176 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000177 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000178 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000179
180 p.start()
181
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000182 self.assertEquals(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000183 self.assertEquals(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000184 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000185
186 self.assertEquals(q.get(), args[1:])
187 self.assertEquals(q.get(), kwargs)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000188 self.assertEquals(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000189 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000190 self.assertEquals(q.get(), current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000191 self.assertEquals(q.get(), p.pid)
192
193 p.join()
194
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000195 self.assertEquals(p.exitcode, 0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000196 self.assertEquals(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000197 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000198
199 def _test_terminate(self):
200 time.sleep(1000)
201
202 def test_terminate(self):
203 if self.TYPE == 'threads':
204 return
205
206 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000207 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000208 p.start()
209
210 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000211 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000212 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000213
214 p.terminate()
215
216 join = TimingWrapper(p.join)
217 self.assertEqual(join(), None)
218 self.assertTimingAlmostEqual(join.elapsed, 0.0)
219
220 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000221 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000222
223 p.join()
224
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000225 # XXX sometimes get p.exitcode == 0 on Windows ...
226 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000227
228 def test_cpu_count(self):
229 try:
230 cpus = multiprocessing.cpu_count()
231 except NotImplementedError:
232 cpus = 1
233 self.assertTrue(type(cpus) is int)
234 self.assertTrue(cpus >= 1)
235
236 def test_active_children(self):
237 self.assertEqual(type(self.active_children()), list)
238
239 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000240 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000241
242 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000243 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000244
245 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000246 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000247
248 def _test_recursion(self, wconn, id):
249 from multiprocessing import forking
250 wconn.send(id)
251 if len(id) < 2:
252 for i in range(2):
253 p = self.Process(
254 target=self._test_recursion, args=(wconn, id+[i])
255 )
256 p.start()
257 p.join()
258
259 def test_recursion(self):
260 rconn, wconn = self.Pipe(duplex=False)
261 self._test_recursion(wconn, [])
262
263 time.sleep(DELTA)
264 result = []
265 while rconn.poll():
266 result.append(rconn.recv())
267
268 expected = [
269 [],
270 [0],
271 [0, 0],
272 [0, 1],
273 [1],
274 [1, 0],
275 [1, 1]
276 ]
277 self.assertEqual(result, expected)
278
279#
280#
281#
282
283class _UpperCaser(multiprocessing.Process):
284
285 def __init__(self):
286 multiprocessing.Process.__init__(self)
287 self.child_conn, self.parent_conn = multiprocessing.Pipe()
288
289 def run(self):
290 self.parent_conn.close()
291 for s in iter(self.child_conn.recv, None):
292 self.child_conn.send(s.upper())
293 self.child_conn.close()
294
295 def submit(self, s):
296 assert type(s) is str
297 self.parent_conn.send(s)
298 return self.parent_conn.recv()
299
300 def stop(self):
301 self.parent_conn.send(None)
302 self.parent_conn.close()
303 self.child_conn.close()
304
305class _TestSubclassingProcess(BaseTestCase):
306
307 ALLOWED_TYPES = ('processes',)
308
309 def test_subclassing(self):
310 uppercaser = _UpperCaser()
311 uppercaser.start()
312 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
313 self.assertEqual(uppercaser.submit('world'), 'WORLD')
314 uppercaser.stop()
315 uppercaser.join()
316
317#
318#
319#
320
321def queue_empty(q):
322 if hasattr(q, 'empty'):
323 return q.empty()
324 else:
325 return q.qsize() == 0
326
327def queue_full(q, maxsize):
328 if hasattr(q, 'full'):
329 return q.full()
330 else:
331 return q.qsize() == maxsize
332
333
334class _TestQueue(BaseTestCase):
335
336
337 def _test_put(self, queue, child_can_start, parent_can_continue):
338 child_can_start.wait()
339 for i in range(6):
340 queue.get()
341 parent_can_continue.set()
342
343 def test_put(self):
344 MAXSIZE = 6
345 queue = self.Queue(maxsize=MAXSIZE)
346 child_can_start = self.Event()
347 parent_can_continue = self.Event()
348
349 proc = self.Process(
350 target=self._test_put,
351 args=(queue, child_can_start, parent_can_continue)
352 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000353 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000354 proc.start()
355
356 self.assertEqual(queue_empty(queue), True)
357 self.assertEqual(queue_full(queue, MAXSIZE), False)
358
359 queue.put(1)
360 queue.put(2, True)
361 queue.put(3, True, None)
362 queue.put(4, False)
363 queue.put(5, False, None)
364 queue.put_nowait(6)
365
366 # the values may be in buffer but not yet in pipe so sleep a bit
367 time.sleep(DELTA)
368
369 self.assertEqual(queue_empty(queue), False)
370 self.assertEqual(queue_full(queue, MAXSIZE), True)
371
372 put = TimingWrapper(queue.put)
373 put_nowait = TimingWrapper(queue.put_nowait)
374
375 self.assertRaises(pyqueue.Full, put, 7, False)
376 self.assertTimingAlmostEqual(put.elapsed, 0)
377
378 self.assertRaises(pyqueue.Full, put, 7, False, None)
379 self.assertTimingAlmostEqual(put.elapsed, 0)
380
381 self.assertRaises(pyqueue.Full, put_nowait, 7)
382 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
383
384 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
385 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
386
387 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
388 self.assertTimingAlmostEqual(put.elapsed, 0)
389
390 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
391 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
392
393 child_can_start.set()
394 parent_can_continue.wait()
395
396 self.assertEqual(queue_empty(queue), True)
397 self.assertEqual(queue_full(queue, MAXSIZE), False)
398
399 proc.join()
400
401 def _test_get(self, queue, child_can_start, parent_can_continue):
402 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000403 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000404 queue.put(2)
405 queue.put(3)
406 queue.put(4)
407 queue.put(5)
408 parent_can_continue.set()
409
410 def test_get(self):
411 queue = self.Queue()
412 child_can_start = self.Event()
413 parent_can_continue = self.Event()
414
415 proc = self.Process(
416 target=self._test_get,
417 args=(queue, child_can_start, parent_can_continue)
418 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000419 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000420 proc.start()
421
422 self.assertEqual(queue_empty(queue), True)
423
424 child_can_start.set()
425 parent_can_continue.wait()
426
427 time.sleep(DELTA)
428 self.assertEqual(queue_empty(queue), False)
429
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000430 # Hangs unexpectedly, remove for now
431 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000432 self.assertEqual(queue.get(True, None), 2)
433 self.assertEqual(queue.get(True), 3)
434 self.assertEqual(queue.get(timeout=1), 4)
435 self.assertEqual(queue.get_nowait(), 5)
436
437 self.assertEqual(queue_empty(queue), True)
438
439 get = TimingWrapper(queue.get)
440 get_nowait = TimingWrapper(queue.get_nowait)
441
442 self.assertRaises(pyqueue.Empty, get, False)
443 self.assertTimingAlmostEqual(get.elapsed, 0)
444
445 self.assertRaises(pyqueue.Empty, get, False, None)
446 self.assertTimingAlmostEqual(get.elapsed, 0)
447
448 self.assertRaises(pyqueue.Empty, get_nowait)
449 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
450
451 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
452 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
453
454 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
455 self.assertTimingAlmostEqual(get.elapsed, 0)
456
457 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
458 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
459
460 proc.join()
461
462 def _test_fork(self, queue):
463 for i in range(10, 20):
464 queue.put(i)
465 # note that at this point the items may only be buffered, so the
466 # process cannot shutdown until the feeder thread has finished
467 # pushing items onto the pipe.
468
469 def test_fork(self):
470 # Old versions of Queue would fail to create a new feeder
471 # thread for a forked process if the original process had its
472 # own feeder thread. This test checks that this no longer
473 # happens.
474
475 queue = self.Queue()
476
477 # put items on queue so that main process starts a feeder thread
478 for i in range(10):
479 queue.put(i)
480
481 # wait to make sure thread starts before we fork a new process
482 time.sleep(DELTA)
483
484 # fork process
485 p = self.Process(target=self._test_fork, args=(queue,))
486 p.start()
487
488 # check that all expected items are in the queue
489 for i in range(20):
490 self.assertEqual(queue.get(), i)
491 self.assertRaises(pyqueue.Empty, queue.get, False)
492
493 p.join()
494
495 def test_qsize(self):
496 q = self.Queue()
497 try:
498 self.assertEqual(q.qsize(), 0)
499 except NotImplementedError:
500 return
501 q.put(1)
502 self.assertEqual(q.qsize(), 1)
503 q.put(5)
504 self.assertEqual(q.qsize(), 2)
505 q.get()
506 self.assertEqual(q.qsize(), 1)
507 q.get()
508 self.assertEqual(q.qsize(), 0)
509
510 def _test_task_done(self, q):
511 for obj in iter(q.get, None):
512 time.sleep(DELTA)
513 q.task_done()
514
515 def test_task_done(self):
516 queue = self.JoinableQueue()
517
518 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000519 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000520
521 workers = [self.Process(target=self._test_task_done, args=(queue,))
522 for i in range(4)]
523
524 for p in workers:
525 p.start()
526
527 for i in range(10):
528 queue.put(i)
529
530 queue.join()
531
532 for p in workers:
533 queue.put(None)
534
535 for p in workers:
536 p.join()
537
538#
539#
540#
541
542class _TestLock(BaseTestCase):
543
544 def test_lock(self):
545 lock = self.Lock()
546 self.assertEqual(lock.acquire(), True)
547 self.assertEqual(lock.acquire(False), False)
548 self.assertEqual(lock.release(), None)
549 self.assertRaises((ValueError, threading.ThreadError), lock.release)
550
551 def test_rlock(self):
552 lock = self.RLock()
553 self.assertEqual(lock.acquire(), True)
554 self.assertEqual(lock.acquire(), True)
555 self.assertEqual(lock.acquire(), True)
556 self.assertEqual(lock.release(), None)
557 self.assertEqual(lock.release(), None)
558 self.assertEqual(lock.release(), None)
559 self.assertRaises((AssertionError, RuntimeError), lock.release)
560
Jesse Nollerf8d00852009-03-31 03:25:07 +0000561 def test_lock_context(self):
562 with self.Lock():
563 pass
564
Benjamin Petersone711caf2008-06-11 16:44:04 +0000565
566class _TestSemaphore(BaseTestCase):
567
568 def _test_semaphore(self, sem):
569 self.assertReturnsIfImplemented(2, get_value, sem)
570 self.assertEqual(sem.acquire(), True)
571 self.assertReturnsIfImplemented(1, get_value, sem)
572 self.assertEqual(sem.acquire(), True)
573 self.assertReturnsIfImplemented(0, get_value, sem)
574 self.assertEqual(sem.acquire(False), False)
575 self.assertReturnsIfImplemented(0, get_value, sem)
576 self.assertEqual(sem.release(), None)
577 self.assertReturnsIfImplemented(1, get_value, sem)
578 self.assertEqual(sem.release(), None)
579 self.assertReturnsIfImplemented(2, get_value, sem)
580
581 def test_semaphore(self):
582 sem = self.Semaphore(2)
583 self._test_semaphore(sem)
584 self.assertEqual(sem.release(), None)
585 self.assertReturnsIfImplemented(3, get_value, sem)
586 self.assertEqual(sem.release(), None)
587 self.assertReturnsIfImplemented(4, get_value, sem)
588
589 def test_bounded_semaphore(self):
590 sem = self.BoundedSemaphore(2)
591 self._test_semaphore(sem)
592 # Currently fails on OS/X
593 #if HAVE_GETVALUE:
594 # self.assertRaises(ValueError, sem.release)
595 # self.assertReturnsIfImplemented(2, get_value, sem)
596
597 def test_timeout(self):
598 if self.TYPE != 'processes':
599 return
600
601 sem = self.Semaphore(0)
602 acquire = TimingWrapper(sem.acquire)
603
604 self.assertEqual(acquire(False), False)
605 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
606
607 self.assertEqual(acquire(False, None), False)
608 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
609
610 self.assertEqual(acquire(False, TIMEOUT1), False)
611 self.assertTimingAlmostEqual(acquire.elapsed, 0)
612
613 self.assertEqual(acquire(True, TIMEOUT2), False)
614 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
615
616 self.assertEqual(acquire(timeout=TIMEOUT3), False)
617 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
618
619
620class _TestCondition(BaseTestCase):
621
622 def f(self, cond, sleeping, woken, timeout=None):
623 cond.acquire()
624 sleeping.release()
625 cond.wait(timeout)
626 woken.release()
627 cond.release()
628
629 def check_invariant(self, cond):
630 # this is only supposed to succeed when there are no sleepers
631 if self.TYPE == 'processes':
632 try:
633 sleepers = (cond._sleeping_count.get_value() -
634 cond._woken_count.get_value())
635 self.assertEqual(sleepers, 0)
636 self.assertEqual(cond._wait_semaphore.get_value(), 0)
637 except NotImplementedError:
638 pass
639
640 def test_notify(self):
641 cond = self.Condition()
642 sleeping = self.Semaphore(0)
643 woken = self.Semaphore(0)
644
645 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000646 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000647 p.start()
648
649 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000650 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000651 p.start()
652
653 # wait for both children to start sleeping
654 sleeping.acquire()
655 sleeping.acquire()
656
657 # check no process/thread has woken up
658 time.sleep(DELTA)
659 self.assertReturnsIfImplemented(0, get_value, woken)
660
661 # wake up one process/thread
662 cond.acquire()
663 cond.notify()
664 cond.release()
665
666 # check one process/thread has woken up
667 time.sleep(DELTA)
668 self.assertReturnsIfImplemented(1, get_value, woken)
669
670 # wake up another
671 cond.acquire()
672 cond.notify()
673 cond.release()
674
675 # check other has woken up
676 time.sleep(DELTA)
677 self.assertReturnsIfImplemented(2, get_value, woken)
678
679 # check state is not mucked up
680 self.check_invariant(cond)
681 p.join()
682
683 def test_notify_all(self):
684 cond = self.Condition()
685 sleeping = self.Semaphore(0)
686 woken = self.Semaphore(0)
687
688 # start some threads/processes which will timeout
689 for i in range(3):
690 p = self.Process(target=self.f,
691 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000692 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000693 p.start()
694
695 t = threading.Thread(target=self.f,
696 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000697 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000698 t.start()
699
700 # wait for them all to sleep
701 for i in range(6):
702 sleeping.acquire()
703
704 # check they have all timed out
705 for i in range(6):
706 woken.acquire()
707 self.assertReturnsIfImplemented(0, get_value, woken)
708
709 # check state is not mucked up
710 self.check_invariant(cond)
711
712 # start some more threads/processes
713 for i in range(3):
714 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000715 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000716 p.start()
717
718 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000719 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000720 t.start()
721
722 # wait for them to all sleep
723 for i in range(6):
724 sleeping.acquire()
725
726 # check no process/thread has woken up
727 time.sleep(DELTA)
728 self.assertReturnsIfImplemented(0, get_value, woken)
729
730 # wake them all up
731 cond.acquire()
732 cond.notify_all()
733 cond.release()
734
735 # check they have all woken
736 time.sleep(DELTA)
737 self.assertReturnsIfImplemented(6, get_value, woken)
738
739 # check state is not mucked up
740 self.check_invariant(cond)
741
742 def test_timeout(self):
743 cond = self.Condition()
744 wait = TimingWrapper(cond.wait)
745 cond.acquire()
746 res = wait(TIMEOUT1)
747 cond.release()
748 self.assertEqual(res, None)
749 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
750
751
752class _TestEvent(BaseTestCase):
753
754 def _test_event(self, event):
755 time.sleep(TIMEOUT2)
756 event.set()
757
758 def test_event(self):
759 event = self.Event()
760 wait = TimingWrapper(event.wait)
761
762 # Removed temporaily, due to API shear, this does not
763 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000764 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000765
Benjamin Peterson965ce872009-04-05 21:24:58 +0000766 # Removed, threading.Event.wait() will return the value of the __flag
767 # instead of None. API Shear with the semaphore backed mp.Event
768 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000769 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000770 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000771 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
772
773 event.set()
774
775 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000776 self.assertEqual(event.is_set(), True)
777 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000778 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000779 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000780 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
781 # self.assertEqual(event.is_set(), True)
782
783 event.clear()
784
785 #self.assertEqual(event.is_set(), False)
786
787 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000788 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000789
790#
791#
792#
793
794class _TestValue(BaseTestCase):
795
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000796 ALLOWED_TYPES = ('processes',)
797
Benjamin Petersone711caf2008-06-11 16:44:04 +0000798 codes_values = [
799 ('i', 4343, 24234),
800 ('d', 3.625, -4.25),
801 ('h', -232, 234),
802 ('c', latin('x'), latin('y'))
803 ]
804
805 def _test(self, values):
806 for sv, cv in zip(values, self.codes_values):
807 sv.value = cv[2]
808
809
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000810 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000811 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000812 if raw:
813 values = [self.RawValue(code, value)
814 for code, value, _ in self.codes_values]
815 else:
816 values = [self.Value(code, value)
817 for code, value, _ in self.codes_values]
818
819 for sv, cv in zip(values, self.codes_values):
820 self.assertEqual(sv.value, cv[1])
821
822 proc = self.Process(target=self._test, args=(values,))
823 proc.start()
824 proc.join()
825
826 for sv, cv in zip(values, self.codes_values):
827 self.assertEqual(sv.value, cv[2])
828
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000829 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000830 def test_rawvalue(self):
831 self.test_value(raw=True)
832
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000833 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000834 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000835 val1 = self.Value('i', 5)
836 lock1 = val1.get_lock()
837 obj1 = val1.get_obj()
838
839 val2 = self.Value('i', 5, lock=None)
840 lock2 = val2.get_lock()
841 obj2 = val2.get_obj()
842
843 lock = self.Lock()
844 val3 = self.Value('i', 5, lock=lock)
845 lock3 = val3.get_lock()
846 obj3 = val3.get_obj()
847 self.assertEqual(lock, lock3)
848
Jesse Nollerb0516a62009-01-18 03:11:38 +0000849 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000850 self.assertFalse(hasattr(arr4, 'get_lock'))
851 self.assertFalse(hasattr(arr4, 'get_obj'))
852
Jesse Nollerb0516a62009-01-18 03:11:38 +0000853 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
854
855 arr5 = self.RawValue('i', 5)
856 self.assertFalse(hasattr(arr5, 'get_lock'))
857 self.assertFalse(hasattr(arr5, 'get_obj'))
858
Benjamin Petersone711caf2008-06-11 16:44:04 +0000859
860class _TestArray(BaseTestCase):
861
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000862 ALLOWED_TYPES = ('processes',)
863
Benjamin Petersone711caf2008-06-11 16:44:04 +0000864 def f(self, seq):
865 for i in range(1, len(seq)):
866 seq[i] += seq[i-1]
867
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000868 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000869 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000870 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
871 if raw:
872 arr = self.RawArray('i', seq)
873 else:
874 arr = self.Array('i', seq)
875
876 self.assertEqual(len(arr), len(seq))
877 self.assertEqual(arr[3], seq[3])
878 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
879
880 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
881
882 self.assertEqual(list(arr[:]), seq)
883
884 self.f(seq)
885
886 p = self.Process(target=self.f, args=(arr,))
887 p.start()
888 p.join()
889
890 self.assertEqual(list(arr[:]), seq)
891
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000892 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000893 def test_rawarray(self):
894 self.test_array(raw=True)
895
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000896 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000897 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000898 arr1 = self.Array('i', list(range(10)))
899 lock1 = arr1.get_lock()
900 obj1 = arr1.get_obj()
901
902 arr2 = self.Array('i', list(range(10)), lock=None)
903 lock2 = arr2.get_lock()
904 obj2 = arr2.get_obj()
905
906 lock = self.Lock()
907 arr3 = self.Array('i', list(range(10)), lock=lock)
908 lock3 = arr3.get_lock()
909 obj3 = arr3.get_obj()
910 self.assertEqual(lock, lock3)
911
Jesse Nollerb0516a62009-01-18 03:11:38 +0000912 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000913 self.assertFalse(hasattr(arr4, 'get_lock'))
914 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000915 self.assertRaises(AttributeError,
916 self.Array, 'i', range(10), lock='notalock')
917
918 arr5 = self.RawArray('i', range(10))
919 self.assertFalse(hasattr(arr5, 'get_lock'))
920 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000921
922#
923#
924#
925
926class _TestContainers(BaseTestCase):
927
928 ALLOWED_TYPES = ('manager',)
929
930 def test_list(self):
931 a = self.list(list(range(10)))
932 self.assertEqual(a[:], list(range(10)))
933
934 b = self.list()
935 self.assertEqual(b[:], [])
936
937 b.extend(list(range(5)))
938 self.assertEqual(b[:], list(range(5)))
939
940 self.assertEqual(b[2], 2)
941 self.assertEqual(b[2:10], [2,3,4])
942
943 b *= 2
944 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
945
946 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
947
948 self.assertEqual(a[:], list(range(10)))
949
950 d = [a, b]
951 e = self.list(d)
952 self.assertEqual(
953 e[:],
954 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
955 )
956
957 f = self.list([a])
958 a.append('hello')
959 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
960
961 def test_dict(self):
962 d = self.dict()
963 indices = list(range(65, 70))
964 for i in indices:
965 d[i] = chr(i)
966 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
967 self.assertEqual(sorted(d.keys()), indices)
968 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
969 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
970
971 def test_namespace(self):
972 n = self.Namespace()
973 n.name = 'Bob'
974 n.job = 'Builder'
975 n._hidden = 'hidden'
976 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
977 del n.job
978 self.assertEqual(str(n), "Namespace(name='Bob')")
979 self.assertTrue(hasattr(n, 'name'))
980 self.assertTrue(not hasattr(n, 'job'))
981
982#
983#
984#
985
986def sqr(x, wait=0.0):
987 time.sleep(wait)
988 return x*x
Benjamin Petersone711caf2008-06-11 16:44:04 +0000989class _TestPool(BaseTestCase):
990
991 def test_apply(self):
992 papply = self.pool.apply
993 self.assertEqual(papply(sqr, (5,)), sqr(5))
994 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
995
996 def test_map(self):
997 pmap = self.pool.map
998 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
999 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1000 list(map(sqr, list(range(100)))))
1001
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001002 def test_map_chunksize(self):
1003 try:
1004 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1005 except multiprocessing.TimeoutError:
1006 self.fail("pool.map_async with chunksize stalled on null list")
1007
Benjamin Petersone711caf2008-06-11 16:44:04 +00001008 def test_async(self):
1009 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1010 get = TimingWrapper(res.get)
1011 self.assertEqual(get(), 49)
1012 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1013
1014 def test_async_timeout(self):
1015 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1016 get = TimingWrapper(res.get)
1017 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1018 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1019
1020 def test_imap(self):
1021 it = self.pool.imap(sqr, list(range(10)))
1022 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1023
1024 it = self.pool.imap(sqr, list(range(10)))
1025 for i in range(10):
1026 self.assertEqual(next(it), i*i)
1027 self.assertRaises(StopIteration, it.__next__)
1028
1029 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1030 for i in range(1000):
1031 self.assertEqual(next(it), i*i)
1032 self.assertRaises(StopIteration, it.__next__)
1033
1034 def test_imap_unordered(self):
1035 it = self.pool.imap_unordered(sqr, list(range(1000)))
1036 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1037
1038 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1039 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1040
1041 def test_make_pool(self):
1042 p = multiprocessing.Pool(3)
1043 self.assertEqual(3, len(p._pool))
1044 p.close()
1045 p.join()
1046
1047 def test_terminate(self):
1048 if self.TYPE == 'manager':
1049 # On Unix a forked process increfs each shared object to
1050 # which its parent process held a reference. If the
1051 # forked process gets terminated then there is likely to
1052 # be a reference leak. So to prevent
1053 # _TestZZZNumberOfObjects from failing we skip this test
1054 # when using a manager.
1055 return
1056
1057 result = self.pool.map_async(
1058 time.sleep, [0.1 for i in range(10000)], chunksize=1
1059 )
1060 self.pool.terminate()
1061 join = TimingWrapper(self.pool.join)
1062 join()
1063 self.assertTrue(join.elapsed < 0.2)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001064
1065class _TestPoolWorkerLifetime(BaseTestCase):
1066
1067 ALLOWED_TYPES = ('processes', )
1068 def test_pool_worker_lifetime(self):
1069 p = multiprocessing.Pool(3, maxtasksperchild=10)
1070 self.assertEqual(3, len(p._pool))
1071 origworkerpids = [w.pid for w in p._pool]
1072 # Run many tasks so each worker gets replaced (hopefully)
1073 results = []
1074 for i in range(100):
1075 results.append(p.apply_async(sqr, (i, )))
1076 # Fetch the results and verify we got the right answers,
1077 # also ensuring all the tasks have completed.
1078 for (j, res) in enumerate(results):
1079 self.assertEqual(res.get(), sqr(j))
1080 # Refill the pool
1081 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001082 # Wait until all workers are alive
1083 countdown = 5
1084 while countdown and not all(w.is_alive() for w in p._pool):
1085 countdown -= 1
1086 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001087 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001088 # All pids should be assigned. See issue #7805.
1089 self.assertNotIn(None, origworkerpids)
1090 self.assertNotIn(None, finalworkerpids)
1091 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001092 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1093 p.close()
1094 p.join()
1095
Benjamin Petersone711caf2008-06-11 16:44:04 +00001096#
1097# Test that manager has expected number of shared objects left
1098#
1099
1100class _TestZZZNumberOfObjects(BaseTestCase):
1101 # Because test cases are sorted alphabetically, this one will get
1102 # run after all the other tests for the manager. It tests that
1103 # there have been no "reference leaks" for the manager's shared
1104 # objects. Note the comment in _TestPool.test_terminate().
1105 ALLOWED_TYPES = ('manager',)
1106
1107 def test_number_of_objects(self):
1108 EXPECTED_NUMBER = 1 # the pool object is still alive
1109 multiprocessing.active_children() # discard dead process objs
1110 gc.collect() # do garbage collection
1111 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001112 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001113 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001114 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001115 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001116
1117 self.assertEqual(refs, EXPECTED_NUMBER)
1118
1119#
1120# Test of creating a customized manager class
1121#
1122
1123from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1124
1125class FooBar(object):
1126 def f(self):
1127 return 'f()'
1128 def g(self):
1129 raise ValueError
1130 def _h(self):
1131 return '_h()'
1132
1133def baz():
1134 for i in range(10):
1135 yield i*i
1136
1137class IteratorProxy(BaseProxy):
1138 _exposed_ = ('next', '__next__')
1139 def __iter__(self):
1140 return self
1141 def __next__(self):
1142 return self._callmethod('next')
1143 def __next__(self):
1144 return self._callmethod('__next__')
1145
1146class MyManager(BaseManager):
1147 pass
1148
1149MyManager.register('Foo', callable=FooBar)
1150MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1151MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1152
1153
1154class _TestMyManager(BaseTestCase):
1155
1156 ALLOWED_TYPES = ('manager',)
1157
1158 def test_mymanager(self):
1159 manager = MyManager()
1160 manager.start()
1161
1162 foo = manager.Foo()
1163 bar = manager.Bar()
1164 baz = manager.baz()
1165
1166 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1167 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1168
1169 self.assertEqual(foo_methods, ['f', 'g'])
1170 self.assertEqual(bar_methods, ['f', '_h'])
1171
1172 self.assertEqual(foo.f(), 'f()')
1173 self.assertRaises(ValueError, foo.g)
1174 self.assertEqual(foo._callmethod('f'), 'f()')
1175 self.assertRaises(RemoteError, foo._callmethod, '_h')
1176
1177 self.assertEqual(bar.f(), 'f()')
1178 self.assertEqual(bar._h(), '_h()')
1179 self.assertEqual(bar._callmethod('f'), 'f()')
1180 self.assertEqual(bar._callmethod('_h'), '_h()')
1181
1182 self.assertEqual(list(baz), [i*i for i in range(10)])
1183
1184 manager.shutdown()
1185
1186#
1187# Test of connecting to a remote server and using xmlrpclib for serialization
1188#
1189
1190_queue = pyqueue.Queue()
1191def get_queue():
1192 return _queue
1193
1194class QueueManager(BaseManager):
1195 '''manager class used by server process'''
1196QueueManager.register('get_queue', callable=get_queue)
1197
1198class QueueManager2(BaseManager):
1199 '''manager class which specifies the same interface as QueueManager'''
1200QueueManager2.register('get_queue')
1201
1202
1203SERIALIZER = 'xmlrpclib'
1204
1205class _TestRemoteManager(BaseTestCase):
1206
1207 ALLOWED_TYPES = ('manager',)
1208
1209 def _putter(self, address, authkey):
1210 manager = QueueManager2(
1211 address=address, authkey=authkey, serializer=SERIALIZER
1212 )
1213 manager.connect()
1214 queue = manager.get_queue()
1215 queue.put(('hello world', None, True, 2.25))
1216
1217 def test_remote(self):
1218 authkey = os.urandom(32)
1219
1220 manager = QueueManager(
1221 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1222 )
1223 manager.start()
1224
1225 p = self.Process(target=self._putter, args=(manager.address, authkey))
1226 p.start()
1227
1228 manager2 = QueueManager2(
1229 address=manager.address, authkey=authkey, serializer=SERIALIZER
1230 )
1231 manager2.connect()
1232 queue = manager2.get_queue()
1233
1234 # Note that xmlrpclib will deserialize object as a list not a tuple
1235 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1236
1237 # Because we are using xmlrpclib for serialization instead of
1238 # pickle this will cause a serialization error.
1239 self.assertRaises(Exception, queue.put, time.sleep)
1240
1241 # Make queue finalizer run before the server is stopped
1242 del queue
1243 manager.shutdown()
1244
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001245class _TestManagerRestart(BaseTestCase):
1246
1247 def _putter(self, address, authkey):
1248 manager = QueueManager(
1249 address=address, authkey=authkey, serializer=SERIALIZER)
1250 manager.connect()
1251 queue = manager.get_queue()
1252 queue.put('hello world')
1253
1254 def test_rapid_restart(self):
1255 authkey = os.urandom(32)
R. David Murray83396232009-12-14 22:45:15 +00001256 port = test.support.find_unused_port()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001257 manager = QueueManager(
R. David Murray83396232009-12-14 22:45:15 +00001258 address=('localhost', port), authkey=authkey, serializer=SERIALIZER)
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001259 manager.start()
1260
1261 p = self.Process(target=self._putter, args=(manager.address, authkey))
1262 p.start()
1263 queue = manager.get_queue()
1264 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001265 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001266 manager.shutdown()
1267 manager = QueueManager(
R. David Murray83396232009-12-14 22:45:15 +00001268 address=('localhost', port), authkey=authkey, serializer=SERIALIZER)
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001269 manager.start()
Jesse Noller35d1f002009-03-30 22:59:27 +00001270 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001271
Benjamin Petersone711caf2008-06-11 16:44:04 +00001272#
1273#
1274#
1275
1276SENTINEL = latin('')
1277
1278class _TestConnection(BaseTestCase):
1279
1280 ALLOWED_TYPES = ('processes', 'threads')
1281
1282 def _echo(self, conn):
1283 for msg in iter(conn.recv_bytes, SENTINEL):
1284 conn.send_bytes(msg)
1285 conn.close()
1286
1287 def test_connection(self):
1288 conn, child_conn = self.Pipe()
1289
1290 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001291 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001292 p.start()
1293
1294 seq = [1, 2.25, None]
1295 msg = latin('hello world')
1296 longmsg = msg * 10
1297 arr = array.array('i', list(range(4)))
1298
1299 if self.TYPE == 'processes':
1300 self.assertEqual(type(conn.fileno()), int)
1301
1302 self.assertEqual(conn.send(seq), None)
1303 self.assertEqual(conn.recv(), seq)
1304
1305 self.assertEqual(conn.send_bytes(msg), None)
1306 self.assertEqual(conn.recv_bytes(), msg)
1307
1308 if self.TYPE == 'processes':
1309 buffer = array.array('i', [0]*10)
1310 expected = list(arr) + [0] * (10 - len(arr))
1311 self.assertEqual(conn.send_bytes(arr), None)
1312 self.assertEqual(conn.recv_bytes_into(buffer),
1313 len(arr) * buffer.itemsize)
1314 self.assertEqual(list(buffer), expected)
1315
1316 buffer = array.array('i', [0]*10)
1317 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1318 self.assertEqual(conn.send_bytes(arr), None)
1319 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1320 len(arr) * buffer.itemsize)
1321 self.assertEqual(list(buffer), expected)
1322
1323 buffer = bytearray(latin(' ' * 40))
1324 self.assertEqual(conn.send_bytes(longmsg), None)
1325 try:
1326 res = conn.recv_bytes_into(buffer)
1327 except multiprocessing.BufferTooShort as e:
1328 self.assertEqual(e.args, (longmsg,))
1329 else:
1330 self.fail('expected BufferTooShort, got %s' % res)
1331
1332 poll = TimingWrapper(conn.poll)
1333
1334 self.assertEqual(poll(), False)
1335 self.assertTimingAlmostEqual(poll.elapsed, 0)
1336
1337 self.assertEqual(poll(TIMEOUT1), False)
1338 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1339
1340 conn.send(None)
1341
1342 self.assertEqual(poll(TIMEOUT1), True)
1343 self.assertTimingAlmostEqual(poll.elapsed, 0)
1344
1345 self.assertEqual(conn.recv(), None)
1346
1347 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1348 conn.send_bytes(really_big_msg)
1349 self.assertEqual(conn.recv_bytes(), really_big_msg)
1350
1351 conn.send_bytes(SENTINEL) # tell child to quit
1352 child_conn.close()
1353
1354 if self.TYPE == 'processes':
1355 self.assertEqual(conn.readable, True)
1356 self.assertEqual(conn.writable, True)
1357 self.assertRaises(EOFError, conn.recv)
1358 self.assertRaises(EOFError, conn.recv_bytes)
1359
1360 p.join()
1361
1362 def test_duplex_false(self):
1363 reader, writer = self.Pipe(duplex=False)
1364 self.assertEqual(writer.send(1), None)
1365 self.assertEqual(reader.recv(), 1)
1366 if self.TYPE == 'processes':
1367 self.assertEqual(reader.readable, True)
1368 self.assertEqual(reader.writable, False)
1369 self.assertEqual(writer.readable, False)
1370 self.assertEqual(writer.writable, True)
1371 self.assertRaises(IOError, reader.send, 2)
1372 self.assertRaises(IOError, writer.recv)
1373 self.assertRaises(IOError, writer.poll)
1374
1375 def test_spawn_close(self):
1376 # We test that a pipe connection can be closed by parent
1377 # process immediately after child is spawned. On Windows this
1378 # would have sometimes failed on old versions because
1379 # child_conn would be closed before the child got a chance to
1380 # duplicate it.
1381 conn, child_conn = self.Pipe()
1382
1383 p = self.Process(target=self._echo, args=(child_conn,))
1384 p.start()
1385 child_conn.close() # this might complete before child initializes
1386
1387 msg = latin('hello')
1388 conn.send_bytes(msg)
1389 self.assertEqual(conn.recv_bytes(), msg)
1390
1391 conn.send_bytes(SENTINEL)
1392 conn.close()
1393 p.join()
1394
1395 def test_sendbytes(self):
1396 if self.TYPE != 'processes':
1397 return
1398
1399 msg = latin('abcdefghijklmnopqrstuvwxyz')
1400 a, b = self.Pipe()
1401
1402 a.send_bytes(msg)
1403 self.assertEqual(b.recv_bytes(), msg)
1404
1405 a.send_bytes(msg, 5)
1406 self.assertEqual(b.recv_bytes(), msg[5:])
1407
1408 a.send_bytes(msg, 7, 8)
1409 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1410
1411 a.send_bytes(msg, 26)
1412 self.assertEqual(b.recv_bytes(), latin(''))
1413
1414 a.send_bytes(msg, 26, 0)
1415 self.assertEqual(b.recv_bytes(), latin(''))
1416
1417 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1418
1419 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1420
1421 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1422
1423 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1424
1425 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1426
Benjamin Petersone711caf2008-06-11 16:44:04 +00001427class _TestListenerClient(BaseTestCase):
1428
1429 ALLOWED_TYPES = ('processes', 'threads')
1430
1431 def _test(self, address):
1432 conn = self.connection.Client(address)
1433 conn.send('hello')
1434 conn.close()
1435
1436 def test_listener_client(self):
1437 for family in self.connection.families:
1438 l = self.connection.Listener(family=family)
1439 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001440 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001441 p.start()
1442 conn = l.accept()
1443 self.assertEqual(conn.recv(), 'hello')
1444 p.join()
1445 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001446#
1447# Test of sending connection and socket objects between processes
1448#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001449"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001450class _TestPicklingConnections(BaseTestCase):
1451
1452 ALLOWED_TYPES = ('processes',)
1453
1454 def _listener(self, conn, families):
1455 for fam in families:
1456 l = self.connection.Listener(family=fam)
1457 conn.send(l.address)
1458 new_conn = l.accept()
1459 conn.send(new_conn)
1460
1461 if self.TYPE == 'processes':
1462 l = socket.socket()
1463 l.bind(('localhost', 0))
1464 conn.send(l.getsockname())
1465 l.listen(1)
1466 new_conn, addr = l.accept()
1467 conn.send(new_conn)
1468
1469 conn.recv()
1470
1471 def _remote(self, conn):
1472 for (address, msg) in iter(conn.recv, None):
1473 client = self.connection.Client(address)
1474 client.send(msg.upper())
1475 client.close()
1476
1477 if self.TYPE == 'processes':
1478 address, msg = conn.recv()
1479 client = socket.socket()
1480 client.connect(address)
1481 client.sendall(msg.upper())
1482 client.close()
1483
1484 conn.close()
1485
1486 def test_pickling(self):
1487 try:
1488 multiprocessing.allow_connection_pickling()
1489 except ImportError:
1490 return
1491
1492 families = self.connection.families
1493
1494 lconn, lconn0 = self.Pipe()
1495 lp = self.Process(target=self._listener, args=(lconn0, families))
1496 lp.start()
1497 lconn0.close()
1498
1499 rconn, rconn0 = self.Pipe()
1500 rp = self.Process(target=self._remote, args=(rconn0,))
1501 rp.start()
1502 rconn0.close()
1503
1504 for fam in families:
1505 msg = ('This connection uses family %s' % fam).encode('ascii')
1506 address = lconn.recv()
1507 rconn.send((address, msg))
1508 new_conn = lconn.recv()
1509 self.assertEqual(new_conn.recv(), msg.upper())
1510
1511 rconn.send(None)
1512
1513 if self.TYPE == 'processes':
1514 msg = latin('This connection uses a normal socket')
1515 address = lconn.recv()
1516 rconn.send((address, msg))
1517 if hasattr(socket, 'fromfd'):
1518 new_conn = lconn.recv()
1519 self.assertEqual(new_conn.recv(100), msg.upper())
1520 else:
1521 # XXX On Windows with Py2.6 need to backport fromfd()
1522 discard = lconn.recv_bytes()
1523
1524 lconn.send(None)
1525
1526 rconn.close()
1527 lconn.close()
1528
1529 lp.join()
1530 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001531"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001532#
1533#
1534#
1535
1536class _TestHeap(BaseTestCase):
1537
1538 ALLOWED_TYPES = ('processes',)
1539
1540 def test_heap(self):
1541 iterations = 5000
1542 maxblocks = 50
1543 blocks = []
1544
1545 # create and destroy lots of blocks of different sizes
1546 for i in range(iterations):
1547 size = int(random.lognormvariate(0, 1) * 1000)
1548 b = multiprocessing.heap.BufferWrapper(size)
1549 blocks.append(b)
1550 if len(blocks) > maxblocks:
1551 i = random.randrange(maxblocks)
1552 del blocks[i]
1553
1554 # get the heap object
1555 heap = multiprocessing.heap.BufferWrapper._heap
1556
1557 # verify the state of the heap
1558 all = []
1559 occupied = 0
1560 for L in list(heap._len_to_seq.values()):
1561 for arena, start, stop in L:
1562 all.append((heap._arenas.index(arena), start, stop,
1563 stop-start, 'free'))
1564 for arena, start, stop in heap._allocated_blocks:
1565 all.append((heap._arenas.index(arena), start, stop,
1566 stop-start, 'occupied'))
1567 occupied += (stop-start)
1568
1569 all.sort()
1570
1571 for i in range(len(all)-1):
1572 (arena, start, stop) = all[i][:3]
1573 (narena, nstart, nstop) = all[i+1][:3]
1574 self.assertTrue((arena != narena and nstart == 0) or
1575 (stop == nstart))
1576
1577#
1578#
1579#
1580
Benjamin Petersone711caf2008-06-11 16:44:04 +00001581class _Foo(Structure):
1582 _fields_ = [
1583 ('x', c_int),
1584 ('y', c_double)
1585 ]
1586
1587class _TestSharedCTypes(BaseTestCase):
1588
1589 ALLOWED_TYPES = ('processes',)
1590
1591 def _double(self, x, y, foo, arr, string):
1592 x.value *= 2
1593 y.value *= 2
1594 foo.x *= 2
1595 foo.y *= 2
1596 string.value *= 2
1597 for i in range(len(arr)):
1598 arr[i] *= 2
1599
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001600 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001601 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001602 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001603 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001604 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001605 arr = self.Array('d', list(range(10)), lock=lock)
1606 string = self.Array('c', 20, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001607 string.value = 'hello'
1608
1609 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1610 p.start()
1611 p.join()
1612
1613 self.assertEqual(x.value, 14)
1614 self.assertAlmostEqual(y.value, 2.0/3.0)
1615 self.assertEqual(foo.x, 6)
1616 self.assertAlmostEqual(foo.y, 4.0)
1617 for i in range(10):
1618 self.assertAlmostEqual(arr[i], i*2)
1619 self.assertEqual(string.value, latin('hellohello'))
1620
1621 def test_synchronize(self):
1622 self.test_sharedctypes(lock=True)
1623
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001624 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001625 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001626 foo = _Foo(2, 5.0)
1627 bar = copy(foo)
1628 foo.x = 0
1629 foo.y = 0
1630 self.assertEqual(bar.x, 2)
1631 self.assertAlmostEqual(bar.y, 5.0)
1632
1633#
1634#
1635#
1636
1637class _TestFinalize(BaseTestCase):
1638
1639 ALLOWED_TYPES = ('processes',)
1640
1641 def _test_finalize(self, conn):
1642 class Foo(object):
1643 pass
1644
1645 a = Foo()
1646 util.Finalize(a, conn.send, args=('a',))
1647 del a # triggers callback for a
1648
1649 b = Foo()
1650 close_b = util.Finalize(b, conn.send, args=('b',))
1651 close_b() # triggers callback for b
1652 close_b() # does nothing because callback has already been called
1653 del b # does nothing because callback has already been called
1654
1655 c = Foo()
1656 util.Finalize(c, conn.send, args=('c',))
1657
1658 d10 = Foo()
1659 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1660
1661 d01 = Foo()
1662 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1663 d02 = Foo()
1664 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1665 d03 = Foo()
1666 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1667
1668 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1669
1670 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1671
1672 # call mutliprocessing's cleanup function then exit process without
1673 # garbage collecting locals
1674 util._exit_function()
1675 conn.close()
1676 os._exit(0)
1677
1678 def test_finalize(self):
1679 conn, child_conn = self.Pipe()
1680
1681 p = self.Process(target=self._test_finalize, args=(child_conn,))
1682 p.start()
1683 p.join()
1684
1685 result = [obj for obj in iter(conn.recv, 'STOP')]
1686 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1687
1688#
1689# Test that from ... import * works for each module
1690#
1691
1692class _TestImportStar(BaseTestCase):
1693
1694 ALLOWED_TYPES = ('processes',)
1695
1696 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001697 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001698 'multiprocessing', 'multiprocessing.connection',
1699 'multiprocessing.heap', 'multiprocessing.managers',
1700 'multiprocessing.pool', 'multiprocessing.process',
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001701 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001702 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001703 ]
1704
1705 if c_int is not None:
1706 # This module requires _ctypes
1707 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001708
1709 for name in modules:
1710 __import__(name)
1711 mod = sys.modules[name]
1712
1713 for attr in getattr(mod, '__all__', ()):
1714 self.assertTrue(
1715 hasattr(mod, attr),
1716 '%r does not have attribute %r' % (mod, attr)
1717 )
1718
1719#
1720# Quick test that logging works -- does not test logging output
1721#
1722
1723class _TestLogging(BaseTestCase):
1724
1725 ALLOWED_TYPES = ('processes',)
1726
1727 def test_enable_logging(self):
1728 logger = multiprocessing.get_logger()
1729 logger.setLevel(util.SUBWARNING)
1730 self.assertTrue(logger is not None)
1731 logger.debug('this will not be printed')
1732 logger.info('nor will this')
1733 logger.setLevel(LOG_LEVEL)
1734
1735 def _test_level(self, conn):
1736 logger = multiprocessing.get_logger()
1737 conn.send(logger.getEffectiveLevel())
1738
1739 def test_level(self):
1740 LEVEL1 = 32
1741 LEVEL2 = 37
1742
1743 logger = multiprocessing.get_logger()
1744 root_logger = logging.getLogger()
1745 root_level = root_logger.level
1746
1747 reader, writer = multiprocessing.Pipe(duplex=False)
1748
1749 logger.setLevel(LEVEL1)
1750 self.Process(target=self._test_level, args=(writer,)).start()
1751 self.assertEqual(LEVEL1, reader.recv())
1752
1753 logger.setLevel(logging.NOTSET)
1754 root_logger.setLevel(LEVEL2)
1755 self.Process(target=self._test_level, args=(writer,)).start()
1756 self.assertEqual(LEVEL2, reader.recv())
1757
1758 root_logger.setLevel(root_level)
1759 logger.setLevel(level=LOG_LEVEL)
1760
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001761
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00001762# class _TestLoggingProcessName(BaseTestCase):
1763#
1764# def handle(self, record):
1765# assert record.processName == multiprocessing.current_process().name
1766# self.__handled = True
1767#
1768# def test_logging(self):
1769# handler = logging.Handler()
1770# handler.handle = self.handle
1771# self.__handled = False
1772# # Bypass getLogger() and side-effects
1773# logger = logging.getLoggerClass()(
1774# 'multiprocessing.test.TestLoggingProcessName')
1775# logger.addHandler(handler)
1776# logger.propagate = False
1777#
1778# logger.warn('foo')
1779# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001780
Benjamin Petersone711caf2008-06-11 16:44:04 +00001781#
Jesse Noller6214edd2009-01-19 16:23:53 +00001782# Test to verify handle verification, see issue 3321
1783#
1784
1785class TestInvalidHandle(unittest.TestCase):
1786
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001787 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001788 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00001789 conn = _multiprocessing.Connection(44977608)
1790 self.assertRaises(IOError, conn.poll)
1791 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001792
Jesse Noller6214edd2009-01-19 16:23:53 +00001793#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001794# Functions used to create test cases from the base ones in this module
1795#
1796
1797def get_attributes(Source, names):
1798 d = {}
1799 for name in names:
1800 obj = getattr(Source, name)
1801 if type(obj) == type(get_attributes):
1802 obj = staticmethod(obj)
1803 d[name] = obj
1804 return d
1805
1806def create_test_cases(Mixin, type):
1807 result = {}
1808 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001809 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001810
1811 for name in list(glob.keys()):
1812 if name.startswith('_Test'):
1813 base = glob[name]
1814 if type in base.ALLOWED_TYPES:
1815 newname = 'With' + Type + name[1:]
1816 class Temp(base, unittest.TestCase, Mixin):
1817 pass
1818 result[newname] = Temp
1819 Temp.__name__ = newname
1820 Temp.__module__ = Mixin.__module__
1821 return result
1822
1823#
1824# Create test cases
1825#
1826
1827class ProcessesMixin(object):
1828 TYPE = 'processes'
1829 Process = multiprocessing.Process
1830 locals().update(get_attributes(multiprocessing, (
1831 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1832 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1833 'RawArray', 'current_process', 'active_children', 'Pipe',
1834 'connection', 'JoinableQueue'
1835 )))
1836
1837testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1838globals().update(testcases_processes)
1839
1840
1841class ManagerMixin(object):
1842 TYPE = 'manager'
1843 Process = multiprocessing.Process
1844 manager = object.__new__(multiprocessing.managers.SyncManager)
1845 locals().update(get_attributes(manager, (
1846 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1847 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1848 'Namespace', 'JoinableQueue'
1849 )))
1850
1851testcases_manager = create_test_cases(ManagerMixin, type='manager')
1852globals().update(testcases_manager)
1853
1854
1855class ThreadsMixin(object):
1856 TYPE = 'threads'
1857 Process = multiprocessing.dummy.Process
1858 locals().update(get_attributes(multiprocessing.dummy, (
1859 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1860 'Condition', 'Event', 'Value', 'Array', 'current_process',
1861 'active_children', 'Pipe', 'connection', 'dict', 'list',
1862 'Namespace', 'JoinableQueue'
1863 )))
1864
1865testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1866globals().update(testcases_threads)
1867
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001868class OtherTest(unittest.TestCase):
1869 # TODO: add more tests for deliver/answer challenge.
1870 def test_deliver_challenge_auth_failure(self):
1871 class _FakeConnection(object):
1872 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001873 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001874 def send_bytes(self, data):
1875 pass
1876 self.assertRaises(multiprocessing.AuthenticationError,
1877 multiprocessing.connection.deliver_challenge,
1878 _FakeConnection(), b'abc')
1879
1880 def test_answer_challenge_auth_failure(self):
1881 class _FakeConnection(object):
1882 def __init__(self):
1883 self.count = 0
1884 def recv_bytes(self, size):
1885 self.count += 1
1886 if self.count == 1:
1887 return multiprocessing.connection.CHALLENGE
1888 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001889 return b'something bogus'
1890 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001891 def send_bytes(self, data):
1892 pass
1893 self.assertRaises(multiprocessing.AuthenticationError,
1894 multiprocessing.connection.answer_challenge,
1895 _FakeConnection(), b'abc')
1896
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00001897#
1898# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1899#
1900
1901def initializer(ns):
1902 ns.test += 1
1903
1904class TestInitializers(unittest.TestCase):
1905 def setUp(self):
1906 self.mgr = multiprocessing.Manager()
1907 self.ns = self.mgr.Namespace()
1908 self.ns.test = 0
1909
1910 def tearDown(self):
1911 self.mgr.shutdown()
1912
1913 def test_manager_initializer(self):
1914 m = multiprocessing.managers.SyncManager()
1915 self.assertRaises(TypeError, m.start, 1)
1916 m.start(initializer, (self.ns,))
1917 self.assertEqual(self.ns.test, 1)
1918 m.shutdown()
1919
1920 def test_pool_initializer(self):
1921 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1922 p = multiprocessing.Pool(1, initializer, (self.ns,))
1923 p.close()
1924 p.join()
1925 self.assertEqual(self.ns.test, 1)
1926
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00001927#
1928# Issue 5155, 5313, 5331: Test process in processes
1929# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1930#
1931
1932def _ThisSubProcess(q):
1933 try:
1934 item = q.get(block=False)
1935 except pyqueue.Empty:
1936 pass
1937
1938def _TestProcess(q):
1939 queue = multiprocessing.Queue()
1940 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1941 subProc.start()
1942 subProc.join()
1943
1944def _afunc(x):
1945 return x*x
1946
1947def pool_in_process():
1948 pool = multiprocessing.Pool(processes=4)
1949 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1950
1951class _file_like(object):
1952 def __init__(self, delegate):
1953 self._delegate = delegate
1954 self._pid = None
1955
1956 @property
1957 def cache(self):
1958 pid = os.getpid()
1959 # There are no race conditions since fork keeps only the running thread
1960 if pid != self._pid:
1961 self._pid = pid
1962 self._cache = []
1963 return self._cache
1964
1965 def write(self, data):
1966 self.cache.append(data)
1967
1968 def flush(self):
1969 self._delegate.write(''.join(self.cache))
1970 self._cache = []
1971
1972class TestStdinBadfiledescriptor(unittest.TestCase):
1973
1974 def test_queue_in_process(self):
1975 queue = multiprocessing.Queue()
1976 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
1977 proc.start()
1978 proc.join()
1979
1980 def test_pool_in_process(self):
1981 p = multiprocessing.Process(target=pool_in_process)
1982 p.start()
1983 p.join()
1984
1985 def test_flushing(self):
1986 sio = io.StringIO()
1987 flike = _file_like(sio)
1988 flike.write('foo')
1989 proc = multiprocessing.Process(target=lambda: flike.flush())
1990 flike.flush()
1991 assert sio.getvalue() == 'foo'
1992
1993testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
1994 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001995
Benjamin Petersone711caf2008-06-11 16:44:04 +00001996#
1997#
1998#
1999
2000def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002001 if sys.platform.startswith("linux"):
2002 try:
2003 lock = multiprocessing.RLock()
2004 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002005 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002006
Benjamin Petersone711caf2008-06-11 16:44:04 +00002007 if run is None:
2008 from test.support import run_unittest as run
2009
2010 util.get_temp_dir() # creates temp directory for use by all processes
2011
2012 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2013
Benjamin Peterson41181742008-07-02 20:22:54 +00002014 ProcessesMixin.pool = multiprocessing.Pool(4)
2015 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2016 ManagerMixin.manager.__init__()
2017 ManagerMixin.manager.start()
2018 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002019
2020 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002021 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2022 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002023 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2024 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002025 )
2026
2027 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2028 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2029 run(suite)
2030
Benjamin Peterson41181742008-07-02 20:22:54 +00002031 ThreadsMixin.pool.terminate()
2032 ProcessesMixin.pool.terminate()
2033 ManagerMixin.pool.terminate()
2034 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002035
Benjamin Peterson41181742008-07-02 20:22:54 +00002036 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002037
2038def main():
2039 test_main(unittest.TextTestRunner(verbosity=2).run)
2040
2041if __name__ == '__main__':
2042 main()