blob: 0bc056fec105a05b45d1122afdcbb9396b444abf [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000027# import threading after _multiprocessing to raise a more revelant error
28# message: "No module named _multiprocessing". _multiprocessing is not compiled
29# without thread support.
30import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.dummy
33import multiprocessing.connection
34import multiprocessing.managers
35import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
Charles-François Natalibc8f0822011-09-20 20:36:51 +020038from multiprocessing import util
39
40try:
41 from multiprocessing import reduction
42 HAS_REDUCTION = True
43except ImportError:
44 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000045
Brian Curtinafa88b52010-10-07 01:12:19 +000046try:
47 from multiprocessing.sharedctypes import Value, copy
48 HAS_SHAREDCTYPES = True
49except ImportError:
50 HAS_SHAREDCTYPES = False
51
Antoine Pitroubcb39d42011-08-23 19:46:22 +020052try:
53 import msvcrt
54except ImportError:
55 msvcrt = None
56
Benjamin Petersone711caf2008-06-11 16:44:04 +000057#
58#
59#
60
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000061def latin(s):
62 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000063
Benjamin Petersone711caf2008-06-11 16:44:04 +000064#
65# Constants
66#
67
68LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000069#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000070
71DELTA = 0.1
72CHECK_TIMINGS = False # making true makes tests take a lot longer
73 # and can sometimes cause some non-serious
74 # failures because some calls block a bit
75 # longer than expected
76if CHECK_TIMINGS:
77 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
78else:
79 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
80
81HAVE_GETVALUE = not getattr(_multiprocessing,
82 'HAVE_BROKEN_SEM_GETVALUE', False)
83
Jesse Noller6214edd2009-01-19 16:23:53 +000084WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020085if WIN32:
86 from _subprocess import WaitForSingleObject, INFINITE, WAIT_OBJECT_0
87
88 def wait_for_handle(handle, timeout):
89 if timeout is None or timeout < 0.0:
90 timeout = INFINITE
91 else:
92 timeout = int(1000 * timeout)
93 return WaitForSingleObject(handle, timeout) == WAIT_OBJECT_0
94else:
95 from select import select
96 _select = util._eintr_retry(select)
97
98 def wait_for_handle(handle, timeout):
99 if timeout is not None and timeout < 0.0:
100 timeout = None
101 return handle in _select([handle], [], [], timeout)[0]
Jesse Noller6214edd2009-01-19 16:23:53 +0000102
Antoine Pitroubcb39d42011-08-23 19:46:22 +0200103try:
104 MAXFD = os.sysconf("SC_OPEN_MAX")
105except:
106 MAXFD = 256
107
Benjamin Petersone711caf2008-06-11 16:44:04 +0000108#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000109# Some tests require ctypes
110#
111
112try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000113 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000114except ImportError:
115 Structure = object
116 c_int = c_double = None
117
118#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000119# Creates a wrapper for a function which records the time it takes to finish
120#
121
122class TimingWrapper(object):
123
124 def __init__(self, func):
125 self.func = func
126 self.elapsed = None
127
128 def __call__(self, *args, **kwds):
129 t = time.time()
130 try:
131 return self.func(*args, **kwds)
132 finally:
133 self.elapsed = time.time() - t
134
135#
136# Base class for test cases
137#
138
139class BaseTestCase(object):
140
141 ALLOWED_TYPES = ('processes', 'manager', 'threads')
142
143 def assertTimingAlmostEqual(self, a, b):
144 if CHECK_TIMINGS:
145 self.assertAlmostEqual(a, b, 1)
146
147 def assertReturnsIfImplemented(self, value, func, *args):
148 try:
149 res = func(*args)
150 except NotImplementedError:
151 pass
152 else:
153 return self.assertEqual(value, res)
154
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000155 # For the sanity of Windows users, rather than crashing or freezing in
156 # multiple ways.
157 def __reduce__(self, *args):
158 raise NotImplementedError("shouldn't try to pickle a test case")
159
160 __reduce_ex__ = __reduce__
161
Benjamin Petersone711caf2008-06-11 16:44:04 +0000162#
163# Return the value of a semaphore
164#
165
166def get_value(self):
167 try:
168 return self.get_value()
169 except AttributeError:
170 try:
171 return self._Semaphore__value
172 except AttributeError:
173 try:
174 return self._value
175 except AttributeError:
176 raise NotImplementedError
177
178#
179# Testcases
180#
181
182class _TestProcess(BaseTestCase):
183
184 ALLOWED_TYPES = ('processes', 'threads')
185
186 def test_current(self):
187 if self.TYPE == 'threads':
188 return
189
190 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000191 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000192
193 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000194 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000195 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000196 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000197 self.assertEqual(current.ident, os.getpid())
198 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000199
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000200 def test_daemon_argument(self):
201 if self.TYPE == "threads":
202 return
203
204 # By default uses the current process's daemon flag.
205 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000206 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000207 proc1 = self.Process(target=self._test, daemon=True)
208 self.assertTrue(proc1.daemon)
209 proc2 = self.Process(target=self._test, daemon=False)
210 self.assertFalse(proc2.daemon)
211
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000212 @classmethod
213 def _test(cls, q, *args, **kwds):
214 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000215 q.put(args)
216 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000217 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000218 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000219 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000220 q.put(current.pid)
221
222 def test_process(self):
223 q = self.Queue(1)
224 e = self.Event()
225 args = (q, 1, 2)
226 kwargs = {'hello':23, 'bye':2.54}
227 name = 'SomeProcess'
228 p = self.Process(
229 target=self._test, args=args, kwargs=kwargs, name=name
230 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000231 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000232 current = self.current_process()
233
234 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000235 self.assertEqual(p.authkey, current.authkey)
236 self.assertEqual(p.is_alive(), False)
237 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000238 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000239 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000240 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000241
242 p.start()
243
Ezio Melottib3aedd42010-11-20 19:04:17 +0000244 self.assertEqual(p.exitcode, None)
245 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000246 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000247
Ezio Melottib3aedd42010-11-20 19:04:17 +0000248 self.assertEqual(q.get(), args[1:])
249 self.assertEqual(q.get(), kwargs)
250 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000251 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000252 self.assertEqual(q.get(), current.authkey)
253 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000254
255 p.join()
256
Ezio Melottib3aedd42010-11-20 19:04:17 +0000257 self.assertEqual(p.exitcode, 0)
258 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000259 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000260
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000261 @classmethod
262 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000263 time.sleep(1000)
264
265 def test_terminate(self):
266 if self.TYPE == 'threads':
267 return
268
269 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000270 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000271 p.start()
272
273 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000274 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000275 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000276
277 p.terminate()
278
279 join = TimingWrapper(p.join)
280 self.assertEqual(join(), None)
281 self.assertTimingAlmostEqual(join.elapsed, 0.0)
282
283 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000284 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000285
286 p.join()
287
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000288 # XXX sometimes get p.exitcode == 0 on Windows ...
289 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000290
291 def test_cpu_count(self):
292 try:
293 cpus = multiprocessing.cpu_count()
294 except NotImplementedError:
295 cpus = 1
296 self.assertTrue(type(cpus) is int)
297 self.assertTrue(cpus >= 1)
298
299 def test_active_children(self):
300 self.assertEqual(type(self.active_children()), list)
301
302 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000303 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000304
Jesus Cea94f964f2011-09-09 20:26:57 +0200305 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000306 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000307 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000308
309 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000310 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000311
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000312 @classmethod
313 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000314 from multiprocessing import forking
315 wconn.send(id)
316 if len(id) < 2:
317 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000318 p = cls.Process(
319 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000320 )
321 p.start()
322 p.join()
323
324 def test_recursion(self):
325 rconn, wconn = self.Pipe(duplex=False)
326 self._test_recursion(wconn, [])
327
328 time.sleep(DELTA)
329 result = []
330 while rconn.poll():
331 result.append(rconn.recv())
332
333 expected = [
334 [],
335 [0],
336 [0, 0],
337 [0, 1],
338 [1],
339 [1, 0],
340 [1, 1]
341 ]
342 self.assertEqual(result, expected)
343
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200344 @classmethod
345 def _test_sentinel(cls, event):
346 event.wait(10.0)
347
348 def test_sentinel(self):
349 if self.TYPE == "threads":
350 return
351 event = self.Event()
352 p = self.Process(target=self._test_sentinel, args=(event,))
353 with self.assertRaises(ValueError):
354 p.sentinel
355 p.start()
356 self.addCleanup(p.join)
357 sentinel = p.sentinel
358 self.assertIsInstance(sentinel, int)
359 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
360 event.set()
361 p.join()
362 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
363
Benjamin Petersone711caf2008-06-11 16:44:04 +0000364#
365#
366#
367
368class _UpperCaser(multiprocessing.Process):
369
370 def __init__(self):
371 multiprocessing.Process.__init__(self)
372 self.child_conn, self.parent_conn = multiprocessing.Pipe()
373
374 def run(self):
375 self.parent_conn.close()
376 for s in iter(self.child_conn.recv, None):
377 self.child_conn.send(s.upper())
378 self.child_conn.close()
379
380 def submit(self, s):
381 assert type(s) is str
382 self.parent_conn.send(s)
383 return self.parent_conn.recv()
384
385 def stop(self):
386 self.parent_conn.send(None)
387 self.parent_conn.close()
388 self.child_conn.close()
389
390class _TestSubclassingProcess(BaseTestCase):
391
392 ALLOWED_TYPES = ('processes',)
393
394 def test_subclassing(self):
395 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200396 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000397 uppercaser.start()
398 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
399 self.assertEqual(uppercaser.submit('world'), 'WORLD')
400 uppercaser.stop()
401 uppercaser.join()
402
403#
404#
405#
406
407def queue_empty(q):
408 if hasattr(q, 'empty'):
409 return q.empty()
410 else:
411 return q.qsize() == 0
412
413def queue_full(q, maxsize):
414 if hasattr(q, 'full'):
415 return q.full()
416 else:
417 return q.qsize() == maxsize
418
419
420class _TestQueue(BaseTestCase):
421
422
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000423 @classmethod
424 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000425 child_can_start.wait()
426 for i in range(6):
427 queue.get()
428 parent_can_continue.set()
429
430 def test_put(self):
431 MAXSIZE = 6
432 queue = self.Queue(maxsize=MAXSIZE)
433 child_can_start = self.Event()
434 parent_can_continue = self.Event()
435
436 proc = self.Process(
437 target=self._test_put,
438 args=(queue, child_can_start, parent_can_continue)
439 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000440 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000441 proc.start()
442
443 self.assertEqual(queue_empty(queue), True)
444 self.assertEqual(queue_full(queue, MAXSIZE), False)
445
446 queue.put(1)
447 queue.put(2, True)
448 queue.put(3, True, None)
449 queue.put(4, False)
450 queue.put(5, False, None)
451 queue.put_nowait(6)
452
453 # the values may be in buffer but not yet in pipe so sleep a bit
454 time.sleep(DELTA)
455
456 self.assertEqual(queue_empty(queue), False)
457 self.assertEqual(queue_full(queue, MAXSIZE), True)
458
459 put = TimingWrapper(queue.put)
460 put_nowait = TimingWrapper(queue.put_nowait)
461
462 self.assertRaises(pyqueue.Full, put, 7, False)
463 self.assertTimingAlmostEqual(put.elapsed, 0)
464
465 self.assertRaises(pyqueue.Full, put, 7, False, None)
466 self.assertTimingAlmostEqual(put.elapsed, 0)
467
468 self.assertRaises(pyqueue.Full, put_nowait, 7)
469 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
470
471 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
472 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
473
474 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
475 self.assertTimingAlmostEqual(put.elapsed, 0)
476
477 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
478 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
479
480 child_can_start.set()
481 parent_can_continue.wait()
482
483 self.assertEqual(queue_empty(queue), True)
484 self.assertEqual(queue_full(queue, MAXSIZE), False)
485
486 proc.join()
487
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000488 @classmethod
489 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000490 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000491 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000492 queue.put(2)
493 queue.put(3)
494 queue.put(4)
495 queue.put(5)
496 parent_can_continue.set()
497
498 def test_get(self):
499 queue = self.Queue()
500 child_can_start = self.Event()
501 parent_can_continue = self.Event()
502
503 proc = self.Process(
504 target=self._test_get,
505 args=(queue, child_can_start, parent_can_continue)
506 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000507 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000508 proc.start()
509
510 self.assertEqual(queue_empty(queue), True)
511
512 child_can_start.set()
513 parent_can_continue.wait()
514
515 time.sleep(DELTA)
516 self.assertEqual(queue_empty(queue), False)
517
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000518 # Hangs unexpectedly, remove for now
519 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000520 self.assertEqual(queue.get(True, None), 2)
521 self.assertEqual(queue.get(True), 3)
522 self.assertEqual(queue.get(timeout=1), 4)
523 self.assertEqual(queue.get_nowait(), 5)
524
525 self.assertEqual(queue_empty(queue), True)
526
527 get = TimingWrapper(queue.get)
528 get_nowait = TimingWrapper(queue.get_nowait)
529
530 self.assertRaises(pyqueue.Empty, get, False)
531 self.assertTimingAlmostEqual(get.elapsed, 0)
532
533 self.assertRaises(pyqueue.Empty, get, False, None)
534 self.assertTimingAlmostEqual(get.elapsed, 0)
535
536 self.assertRaises(pyqueue.Empty, get_nowait)
537 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
538
539 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
540 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
541
542 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
543 self.assertTimingAlmostEqual(get.elapsed, 0)
544
545 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
546 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
547
548 proc.join()
549
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000550 @classmethod
551 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000552 for i in range(10, 20):
553 queue.put(i)
554 # note that at this point the items may only be buffered, so the
555 # process cannot shutdown until the feeder thread has finished
556 # pushing items onto the pipe.
557
558 def test_fork(self):
559 # Old versions of Queue would fail to create a new feeder
560 # thread for a forked process if the original process had its
561 # own feeder thread. This test checks that this no longer
562 # happens.
563
564 queue = self.Queue()
565
566 # put items on queue so that main process starts a feeder thread
567 for i in range(10):
568 queue.put(i)
569
570 # wait to make sure thread starts before we fork a new process
571 time.sleep(DELTA)
572
573 # fork process
574 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200575 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000576 p.start()
577
578 # check that all expected items are in the queue
579 for i in range(20):
580 self.assertEqual(queue.get(), i)
581 self.assertRaises(pyqueue.Empty, queue.get, False)
582
583 p.join()
584
585 def test_qsize(self):
586 q = self.Queue()
587 try:
588 self.assertEqual(q.qsize(), 0)
589 except NotImplementedError:
590 return
591 q.put(1)
592 self.assertEqual(q.qsize(), 1)
593 q.put(5)
594 self.assertEqual(q.qsize(), 2)
595 q.get()
596 self.assertEqual(q.qsize(), 1)
597 q.get()
598 self.assertEqual(q.qsize(), 0)
599
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000600 @classmethod
601 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000602 for obj in iter(q.get, None):
603 time.sleep(DELTA)
604 q.task_done()
605
606 def test_task_done(self):
607 queue = self.JoinableQueue()
608
609 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000610 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000611
612 workers = [self.Process(target=self._test_task_done, args=(queue,))
613 for i in range(4)]
614
615 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200616 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000617 p.start()
618
619 for i in range(10):
620 queue.put(i)
621
622 queue.join()
623
624 for p in workers:
625 queue.put(None)
626
627 for p in workers:
628 p.join()
629
630#
631#
632#
633
634class _TestLock(BaseTestCase):
635
636 def test_lock(self):
637 lock = self.Lock()
638 self.assertEqual(lock.acquire(), True)
639 self.assertEqual(lock.acquire(False), False)
640 self.assertEqual(lock.release(), None)
641 self.assertRaises((ValueError, threading.ThreadError), lock.release)
642
643 def test_rlock(self):
644 lock = self.RLock()
645 self.assertEqual(lock.acquire(), True)
646 self.assertEqual(lock.acquire(), True)
647 self.assertEqual(lock.acquire(), True)
648 self.assertEqual(lock.release(), None)
649 self.assertEqual(lock.release(), None)
650 self.assertEqual(lock.release(), None)
651 self.assertRaises((AssertionError, RuntimeError), lock.release)
652
Jesse Nollerf8d00852009-03-31 03:25:07 +0000653 def test_lock_context(self):
654 with self.Lock():
655 pass
656
Benjamin Petersone711caf2008-06-11 16:44:04 +0000657
658class _TestSemaphore(BaseTestCase):
659
660 def _test_semaphore(self, sem):
661 self.assertReturnsIfImplemented(2, get_value, sem)
662 self.assertEqual(sem.acquire(), True)
663 self.assertReturnsIfImplemented(1, get_value, sem)
664 self.assertEqual(sem.acquire(), True)
665 self.assertReturnsIfImplemented(0, get_value, sem)
666 self.assertEqual(sem.acquire(False), False)
667 self.assertReturnsIfImplemented(0, get_value, sem)
668 self.assertEqual(sem.release(), None)
669 self.assertReturnsIfImplemented(1, get_value, sem)
670 self.assertEqual(sem.release(), None)
671 self.assertReturnsIfImplemented(2, get_value, sem)
672
673 def test_semaphore(self):
674 sem = self.Semaphore(2)
675 self._test_semaphore(sem)
676 self.assertEqual(sem.release(), None)
677 self.assertReturnsIfImplemented(3, get_value, sem)
678 self.assertEqual(sem.release(), None)
679 self.assertReturnsIfImplemented(4, get_value, sem)
680
681 def test_bounded_semaphore(self):
682 sem = self.BoundedSemaphore(2)
683 self._test_semaphore(sem)
684 # Currently fails on OS/X
685 #if HAVE_GETVALUE:
686 # self.assertRaises(ValueError, sem.release)
687 # self.assertReturnsIfImplemented(2, get_value, sem)
688
689 def test_timeout(self):
690 if self.TYPE != 'processes':
691 return
692
693 sem = self.Semaphore(0)
694 acquire = TimingWrapper(sem.acquire)
695
696 self.assertEqual(acquire(False), False)
697 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
698
699 self.assertEqual(acquire(False, None), False)
700 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
701
702 self.assertEqual(acquire(False, TIMEOUT1), False)
703 self.assertTimingAlmostEqual(acquire.elapsed, 0)
704
705 self.assertEqual(acquire(True, TIMEOUT2), False)
706 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
707
708 self.assertEqual(acquire(timeout=TIMEOUT3), False)
709 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
710
711
712class _TestCondition(BaseTestCase):
713
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000714 @classmethod
715 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000716 cond.acquire()
717 sleeping.release()
718 cond.wait(timeout)
719 woken.release()
720 cond.release()
721
722 def check_invariant(self, cond):
723 # this is only supposed to succeed when there are no sleepers
724 if self.TYPE == 'processes':
725 try:
726 sleepers = (cond._sleeping_count.get_value() -
727 cond._woken_count.get_value())
728 self.assertEqual(sleepers, 0)
729 self.assertEqual(cond._wait_semaphore.get_value(), 0)
730 except NotImplementedError:
731 pass
732
733 def test_notify(self):
734 cond = self.Condition()
735 sleeping = self.Semaphore(0)
736 woken = self.Semaphore(0)
737
738 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000739 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000740 p.start()
741
742 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000743 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000744 p.start()
745
746 # wait for both children to start sleeping
747 sleeping.acquire()
748 sleeping.acquire()
749
750 # check no process/thread has woken up
751 time.sleep(DELTA)
752 self.assertReturnsIfImplemented(0, get_value, woken)
753
754 # wake up one process/thread
755 cond.acquire()
756 cond.notify()
757 cond.release()
758
759 # check one process/thread has woken up
760 time.sleep(DELTA)
761 self.assertReturnsIfImplemented(1, get_value, woken)
762
763 # wake up another
764 cond.acquire()
765 cond.notify()
766 cond.release()
767
768 # check other has woken up
769 time.sleep(DELTA)
770 self.assertReturnsIfImplemented(2, get_value, woken)
771
772 # check state is not mucked up
773 self.check_invariant(cond)
774 p.join()
775
776 def test_notify_all(self):
777 cond = self.Condition()
778 sleeping = self.Semaphore(0)
779 woken = self.Semaphore(0)
780
781 # start some threads/processes which will timeout
782 for i in range(3):
783 p = self.Process(target=self.f,
784 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000785 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000786 p.start()
787
788 t = threading.Thread(target=self.f,
789 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000790 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000791 t.start()
792
793 # wait for them all to sleep
794 for i in range(6):
795 sleeping.acquire()
796
797 # check they have all timed out
798 for i in range(6):
799 woken.acquire()
800 self.assertReturnsIfImplemented(0, get_value, woken)
801
802 # check state is not mucked up
803 self.check_invariant(cond)
804
805 # start some more threads/processes
806 for i in range(3):
807 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000808 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000809 p.start()
810
811 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000812 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000813 t.start()
814
815 # wait for them to all sleep
816 for i in range(6):
817 sleeping.acquire()
818
819 # check no process/thread has woken up
820 time.sleep(DELTA)
821 self.assertReturnsIfImplemented(0, get_value, woken)
822
823 # wake them all up
824 cond.acquire()
825 cond.notify_all()
826 cond.release()
827
828 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200829 for i in range(10):
830 try:
831 if get_value(woken) == 6:
832 break
833 except NotImplementedError:
834 break
835 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000836 self.assertReturnsIfImplemented(6, get_value, woken)
837
838 # check state is not mucked up
839 self.check_invariant(cond)
840
841 def test_timeout(self):
842 cond = self.Condition()
843 wait = TimingWrapper(cond.wait)
844 cond.acquire()
845 res = wait(TIMEOUT1)
846 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000847 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000848 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
849
850
851class _TestEvent(BaseTestCase):
852
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000853 @classmethod
854 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000855 time.sleep(TIMEOUT2)
856 event.set()
857
858 def test_event(self):
859 event = self.Event()
860 wait = TimingWrapper(event.wait)
861
Ezio Melotti13925002011-03-16 11:05:33 +0200862 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000863 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000864 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000865
Benjamin Peterson965ce872009-04-05 21:24:58 +0000866 # Removed, threading.Event.wait() will return the value of the __flag
867 # instead of None. API Shear with the semaphore backed mp.Event
868 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000869 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000870 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000871 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
872
873 event.set()
874
875 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000876 self.assertEqual(event.is_set(), True)
877 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000878 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000879 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000880 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
881 # self.assertEqual(event.is_set(), True)
882
883 event.clear()
884
885 #self.assertEqual(event.is_set(), False)
886
Jesus Cea94f964f2011-09-09 20:26:57 +0200887 p = self.Process(target=self._test_event, args=(event,))
888 p.daemon = True
889 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000890 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000891
892#
893#
894#
895
896class _TestValue(BaseTestCase):
897
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000898 ALLOWED_TYPES = ('processes',)
899
Benjamin Petersone711caf2008-06-11 16:44:04 +0000900 codes_values = [
901 ('i', 4343, 24234),
902 ('d', 3.625, -4.25),
903 ('h', -232, 234),
904 ('c', latin('x'), latin('y'))
905 ]
906
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000907 def setUp(self):
908 if not HAS_SHAREDCTYPES:
909 self.skipTest("requires multiprocessing.sharedctypes")
910
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000911 @classmethod
912 def _test(cls, values):
913 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000914 sv.value = cv[2]
915
916
917 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000918 if raw:
919 values = [self.RawValue(code, value)
920 for code, value, _ in self.codes_values]
921 else:
922 values = [self.Value(code, value)
923 for code, value, _ in self.codes_values]
924
925 for sv, cv in zip(values, self.codes_values):
926 self.assertEqual(sv.value, cv[1])
927
928 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200929 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000930 proc.start()
931 proc.join()
932
933 for sv, cv in zip(values, self.codes_values):
934 self.assertEqual(sv.value, cv[2])
935
936 def test_rawvalue(self):
937 self.test_value(raw=True)
938
939 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000940 val1 = self.Value('i', 5)
941 lock1 = val1.get_lock()
942 obj1 = val1.get_obj()
943
944 val2 = self.Value('i', 5, lock=None)
945 lock2 = val2.get_lock()
946 obj2 = val2.get_obj()
947
948 lock = self.Lock()
949 val3 = self.Value('i', 5, lock=lock)
950 lock3 = val3.get_lock()
951 obj3 = val3.get_obj()
952 self.assertEqual(lock, lock3)
953
Jesse Nollerb0516a62009-01-18 03:11:38 +0000954 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000955 self.assertFalse(hasattr(arr4, 'get_lock'))
956 self.assertFalse(hasattr(arr4, 'get_obj'))
957
Jesse Nollerb0516a62009-01-18 03:11:38 +0000958 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
959
960 arr5 = self.RawValue('i', 5)
961 self.assertFalse(hasattr(arr5, 'get_lock'))
962 self.assertFalse(hasattr(arr5, 'get_obj'))
963
Benjamin Petersone711caf2008-06-11 16:44:04 +0000964
965class _TestArray(BaseTestCase):
966
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000967 ALLOWED_TYPES = ('processes',)
968
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000969 @classmethod
970 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000971 for i in range(1, len(seq)):
972 seq[i] += seq[i-1]
973
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000974 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000975 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000976 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
977 if raw:
978 arr = self.RawArray('i', seq)
979 else:
980 arr = self.Array('i', seq)
981
982 self.assertEqual(len(arr), len(seq))
983 self.assertEqual(arr[3], seq[3])
984 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
985
986 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
987
988 self.assertEqual(list(arr[:]), seq)
989
990 self.f(seq)
991
992 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200993 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000994 p.start()
995 p.join()
996
997 self.assertEqual(list(arr[:]), seq)
998
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000999 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001000 def test_array_from_size(self):
1001 size = 10
1002 # Test for zeroing (see issue #11675).
1003 # The repetition below strengthens the test by increasing the chances
1004 # of previously allocated non-zero memory being used for the new array
1005 # on the 2nd and 3rd loops.
1006 for _ in range(3):
1007 arr = self.Array('i', size)
1008 self.assertEqual(len(arr), size)
1009 self.assertEqual(list(arr), [0] * size)
1010 arr[:] = range(10)
1011 self.assertEqual(list(arr), list(range(10)))
1012 del arr
1013
1014 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001015 def test_rawarray(self):
1016 self.test_array(raw=True)
1017
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001018 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001019 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001020 arr1 = self.Array('i', list(range(10)))
1021 lock1 = arr1.get_lock()
1022 obj1 = arr1.get_obj()
1023
1024 arr2 = self.Array('i', list(range(10)), lock=None)
1025 lock2 = arr2.get_lock()
1026 obj2 = arr2.get_obj()
1027
1028 lock = self.Lock()
1029 arr3 = self.Array('i', list(range(10)), lock=lock)
1030 lock3 = arr3.get_lock()
1031 obj3 = arr3.get_obj()
1032 self.assertEqual(lock, lock3)
1033
Jesse Nollerb0516a62009-01-18 03:11:38 +00001034 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001035 self.assertFalse(hasattr(arr4, 'get_lock'))
1036 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001037 self.assertRaises(AttributeError,
1038 self.Array, 'i', range(10), lock='notalock')
1039
1040 arr5 = self.RawArray('i', range(10))
1041 self.assertFalse(hasattr(arr5, 'get_lock'))
1042 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001043
1044#
1045#
1046#
1047
1048class _TestContainers(BaseTestCase):
1049
1050 ALLOWED_TYPES = ('manager',)
1051
1052 def test_list(self):
1053 a = self.list(list(range(10)))
1054 self.assertEqual(a[:], list(range(10)))
1055
1056 b = self.list()
1057 self.assertEqual(b[:], [])
1058
1059 b.extend(list(range(5)))
1060 self.assertEqual(b[:], list(range(5)))
1061
1062 self.assertEqual(b[2], 2)
1063 self.assertEqual(b[2:10], [2,3,4])
1064
1065 b *= 2
1066 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1067
1068 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1069
1070 self.assertEqual(a[:], list(range(10)))
1071
1072 d = [a, b]
1073 e = self.list(d)
1074 self.assertEqual(
1075 e[:],
1076 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1077 )
1078
1079 f = self.list([a])
1080 a.append('hello')
1081 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1082
1083 def test_dict(self):
1084 d = self.dict()
1085 indices = list(range(65, 70))
1086 for i in indices:
1087 d[i] = chr(i)
1088 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1089 self.assertEqual(sorted(d.keys()), indices)
1090 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1091 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1092
1093 def test_namespace(self):
1094 n = self.Namespace()
1095 n.name = 'Bob'
1096 n.job = 'Builder'
1097 n._hidden = 'hidden'
1098 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1099 del n.job
1100 self.assertEqual(str(n), "Namespace(name='Bob')")
1101 self.assertTrue(hasattr(n, 'name'))
1102 self.assertTrue(not hasattr(n, 'job'))
1103
1104#
1105#
1106#
1107
1108def sqr(x, wait=0.0):
1109 time.sleep(wait)
1110 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001111
Benjamin Petersone711caf2008-06-11 16:44:04 +00001112class _TestPool(BaseTestCase):
1113
1114 def test_apply(self):
1115 papply = self.pool.apply
1116 self.assertEqual(papply(sqr, (5,)), sqr(5))
1117 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1118
1119 def test_map(self):
1120 pmap = self.pool.map
1121 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1122 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1123 list(map(sqr, list(range(100)))))
1124
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001125 def test_map_chunksize(self):
1126 try:
1127 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1128 except multiprocessing.TimeoutError:
1129 self.fail("pool.map_async with chunksize stalled on null list")
1130
Benjamin Petersone711caf2008-06-11 16:44:04 +00001131 def test_async(self):
1132 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1133 get = TimingWrapper(res.get)
1134 self.assertEqual(get(), 49)
1135 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1136
1137 def test_async_timeout(self):
1138 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1139 get = TimingWrapper(res.get)
1140 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1141 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1142
1143 def test_imap(self):
1144 it = self.pool.imap(sqr, list(range(10)))
1145 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1146
1147 it = self.pool.imap(sqr, list(range(10)))
1148 for i in range(10):
1149 self.assertEqual(next(it), i*i)
1150 self.assertRaises(StopIteration, it.__next__)
1151
1152 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1153 for i in range(1000):
1154 self.assertEqual(next(it), i*i)
1155 self.assertRaises(StopIteration, it.__next__)
1156
1157 def test_imap_unordered(self):
1158 it = self.pool.imap_unordered(sqr, list(range(1000)))
1159 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1160
1161 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1162 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1163
1164 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001165 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1166 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1167
Benjamin Petersone711caf2008-06-11 16:44:04 +00001168 p = multiprocessing.Pool(3)
1169 self.assertEqual(3, len(p._pool))
1170 p.close()
1171 p.join()
1172
1173 def test_terminate(self):
1174 if self.TYPE == 'manager':
1175 # On Unix a forked process increfs each shared object to
1176 # which its parent process held a reference. If the
1177 # forked process gets terminated then there is likely to
1178 # be a reference leak. So to prevent
1179 # _TestZZZNumberOfObjects from failing we skip this test
1180 # when using a manager.
1181 return
1182
1183 result = self.pool.map_async(
1184 time.sleep, [0.1 for i in range(10000)], chunksize=1
1185 )
1186 self.pool.terminate()
1187 join = TimingWrapper(self.pool.join)
1188 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001189 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001190
Ask Solem2afcbf22010-11-09 20:55:52 +00001191def raising():
1192 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001193
Ask Solem2afcbf22010-11-09 20:55:52 +00001194def unpickleable_result():
1195 return lambda: 42
1196
1197class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001198 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001199
1200 def test_async_error_callback(self):
1201 p = multiprocessing.Pool(2)
1202
1203 scratchpad = [None]
1204 def errback(exc):
1205 scratchpad[0] = exc
1206
1207 res = p.apply_async(raising, error_callback=errback)
1208 self.assertRaises(KeyError, res.get)
1209 self.assertTrue(scratchpad[0])
1210 self.assertIsInstance(scratchpad[0], KeyError)
1211
1212 p.close()
1213 p.join()
1214
1215 def test_unpickleable_result(self):
1216 from multiprocessing.pool import MaybeEncodingError
1217 p = multiprocessing.Pool(2)
1218
1219 # Make sure we don't lose pool processes because of encoding errors.
1220 for iteration in range(20):
1221
1222 scratchpad = [None]
1223 def errback(exc):
1224 scratchpad[0] = exc
1225
1226 res = p.apply_async(unpickleable_result, error_callback=errback)
1227 self.assertRaises(MaybeEncodingError, res.get)
1228 wrapped = scratchpad[0]
1229 self.assertTrue(wrapped)
1230 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1231 self.assertIsNotNone(wrapped.exc)
1232 self.assertIsNotNone(wrapped.value)
1233
1234 p.close()
1235 p.join()
1236
1237class _TestPoolWorkerLifetime(BaseTestCase):
1238 ALLOWED_TYPES = ('processes', )
1239
Jesse Noller1f0b6582010-01-27 03:36:01 +00001240 def test_pool_worker_lifetime(self):
1241 p = multiprocessing.Pool(3, maxtasksperchild=10)
1242 self.assertEqual(3, len(p._pool))
1243 origworkerpids = [w.pid for w in p._pool]
1244 # Run many tasks so each worker gets replaced (hopefully)
1245 results = []
1246 for i in range(100):
1247 results.append(p.apply_async(sqr, (i, )))
1248 # Fetch the results and verify we got the right answers,
1249 # also ensuring all the tasks have completed.
1250 for (j, res) in enumerate(results):
1251 self.assertEqual(res.get(), sqr(j))
1252 # Refill the pool
1253 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001254 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001255 # (countdown * DELTA = 5 seconds max startup process time)
1256 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001257 while countdown and not all(w.is_alive() for w in p._pool):
1258 countdown -= 1
1259 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001260 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001261 # All pids should be assigned. See issue #7805.
1262 self.assertNotIn(None, origworkerpids)
1263 self.assertNotIn(None, finalworkerpids)
1264 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001265 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1266 p.close()
1267 p.join()
1268
Charles-François Natalif8859e12011-10-24 18:45:29 +02001269 def test_pool_worker_lifetime_early_close(self):
1270 # Issue #10332: closing a pool whose workers have limited lifetimes
1271 # before all the tasks completed would make join() hang.
1272 p = multiprocessing.Pool(3, maxtasksperchild=1)
1273 results = []
1274 for i in range(6):
1275 results.append(p.apply_async(sqr, (i, 0.3)))
1276 p.close()
1277 p.join()
1278 # check the results
1279 for (j, res) in enumerate(results):
1280 self.assertEqual(res.get(), sqr(j))
1281
1282
Benjamin Petersone711caf2008-06-11 16:44:04 +00001283#
1284# Test that manager has expected number of shared objects left
1285#
1286
1287class _TestZZZNumberOfObjects(BaseTestCase):
1288 # Because test cases are sorted alphabetically, this one will get
1289 # run after all the other tests for the manager. It tests that
1290 # there have been no "reference leaks" for the manager's shared
1291 # objects. Note the comment in _TestPool.test_terminate().
1292 ALLOWED_TYPES = ('manager',)
1293
1294 def test_number_of_objects(self):
1295 EXPECTED_NUMBER = 1 # the pool object is still alive
1296 multiprocessing.active_children() # discard dead process objs
1297 gc.collect() # do garbage collection
1298 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001299 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001300 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001301 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001302 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001303
1304 self.assertEqual(refs, EXPECTED_NUMBER)
1305
1306#
1307# Test of creating a customized manager class
1308#
1309
1310from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1311
1312class FooBar(object):
1313 def f(self):
1314 return 'f()'
1315 def g(self):
1316 raise ValueError
1317 def _h(self):
1318 return '_h()'
1319
1320def baz():
1321 for i in range(10):
1322 yield i*i
1323
1324class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001325 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001326 def __iter__(self):
1327 return self
1328 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001329 return self._callmethod('__next__')
1330
1331class MyManager(BaseManager):
1332 pass
1333
1334MyManager.register('Foo', callable=FooBar)
1335MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1336MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1337
1338
1339class _TestMyManager(BaseTestCase):
1340
1341 ALLOWED_TYPES = ('manager',)
1342
1343 def test_mymanager(self):
1344 manager = MyManager()
1345 manager.start()
1346
1347 foo = manager.Foo()
1348 bar = manager.Bar()
1349 baz = manager.baz()
1350
1351 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1352 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1353
1354 self.assertEqual(foo_methods, ['f', 'g'])
1355 self.assertEqual(bar_methods, ['f', '_h'])
1356
1357 self.assertEqual(foo.f(), 'f()')
1358 self.assertRaises(ValueError, foo.g)
1359 self.assertEqual(foo._callmethod('f'), 'f()')
1360 self.assertRaises(RemoteError, foo._callmethod, '_h')
1361
1362 self.assertEqual(bar.f(), 'f()')
1363 self.assertEqual(bar._h(), '_h()')
1364 self.assertEqual(bar._callmethod('f'), 'f()')
1365 self.assertEqual(bar._callmethod('_h'), '_h()')
1366
1367 self.assertEqual(list(baz), [i*i for i in range(10)])
1368
1369 manager.shutdown()
1370
1371#
1372# Test of connecting to a remote server and using xmlrpclib for serialization
1373#
1374
1375_queue = pyqueue.Queue()
1376def get_queue():
1377 return _queue
1378
1379class QueueManager(BaseManager):
1380 '''manager class used by server process'''
1381QueueManager.register('get_queue', callable=get_queue)
1382
1383class QueueManager2(BaseManager):
1384 '''manager class which specifies the same interface as QueueManager'''
1385QueueManager2.register('get_queue')
1386
1387
1388SERIALIZER = 'xmlrpclib'
1389
1390class _TestRemoteManager(BaseTestCase):
1391
1392 ALLOWED_TYPES = ('manager',)
1393
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001394 @classmethod
1395 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001396 manager = QueueManager2(
1397 address=address, authkey=authkey, serializer=SERIALIZER
1398 )
1399 manager.connect()
1400 queue = manager.get_queue()
1401 queue.put(('hello world', None, True, 2.25))
1402
1403 def test_remote(self):
1404 authkey = os.urandom(32)
1405
1406 manager = QueueManager(
1407 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1408 )
1409 manager.start()
1410
1411 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001412 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001413 p.start()
1414
1415 manager2 = QueueManager2(
1416 address=manager.address, authkey=authkey, serializer=SERIALIZER
1417 )
1418 manager2.connect()
1419 queue = manager2.get_queue()
1420
1421 # Note that xmlrpclib will deserialize object as a list not a tuple
1422 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1423
1424 # Because we are using xmlrpclib for serialization instead of
1425 # pickle this will cause a serialization error.
1426 self.assertRaises(Exception, queue.put, time.sleep)
1427
1428 # Make queue finalizer run before the server is stopped
1429 del queue
1430 manager.shutdown()
1431
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001432class _TestManagerRestart(BaseTestCase):
1433
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001434 @classmethod
1435 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001436 manager = QueueManager(
1437 address=address, authkey=authkey, serializer=SERIALIZER)
1438 manager.connect()
1439 queue = manager.get_queue()
1440 queue.put('hello world')
1441
1442 def test_rapid_restart(self):
1443 authkey = os.urandom(32)
1444 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001445 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001446 srvr = manager.get_server()
1447 addr = srvr.address
1448 # Close the connection.Listener socket which gets opened as a part
1449 # of manager.get_server(). It's not needed for the test.
1450 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001451 manager.start()
1452
1453 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001454 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001455 p.start()
1456 queue = manager.get_queue()
1457 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001458 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001459 manager.shutdown()
1460 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001461 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001462 try:
1463 manager.start()
1464 except IOError as e:
1465 if e.errno != errno.EADDRINUSE:
1466 raise
1467 # Retry after some time, in case the old socket was lingering
1468 # (sporadic failure on buildbots)
1469 time.sleep(1.0)
1470 manager = QueueManager(
1471 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001472 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001473
Benjamin Petersone711caf2008-06-11 16:44:04 +00001474#
1475#
1476#
1477
1478SENTINEL = latin('')
1479
1480class _TestConnection(BaseTestCase):
1481
1482 ALLOWED_TYPES = ('processes', 'threads')
1483
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001484 @classmethod
1485 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001486 for msg in iter(conn.recv_bytes, SENTINEL):
1487 conn.send_bytes(msg)
1488 conn.close()
1489
1490 def test_connection(self):
1491 conn, child_conn = self.Pipe()
1492
1493 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001494 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001495 p.start()
1496
1497 seq = [1, 2.25, None]
1498 msg = latin('hello world')
1499 longmsg = msg * 10
1500 arr = array.array('i', list(range(4)))
1501
1502 if self.TYPE == 'processes':
1503 self.assertEqual(type(conn.fileno()), int)
1504
1505 self.assertEqual(conn.send(seq), None)
1506 self.assertEqual(conn.recv(), seq)
1507
1508 self.assertEqual(conn.send_bytes(msg), None)
1509 self.assertEqual(conn.recv_bytes(), msg)
1510
1511 if self.TYPE == 'processes':
1512 buffer = array.array('i', [0]*10)
1513 expected = list(arr) + [0] * (10 - len(arr))
1514 self.assertEqual(conn.send_bytes(arr), None)
1515 self.assertEqual(conn.recv_bytes_into(buffer),
1516 len(arr) * buffer.itemsize)
1517 self.assertEqual(list(buffer), expected)
1518
1519 buffer = array.array('i', [0]*10)
1520 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1521 self.assertEqual(conn.send_bytes(arr), None)
1522 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1523 len(arr) * buffer.itemsize)
1524 self.assertEqual(list(buffer), expected)
1525
1526 buffer = bytearray(latin(' ' * 40))
1527 self.assertEqual(conn.send_bytes(longmsg), None)
1528 try:
1529 res = conn.recv_bytes_into(buffer)
1530 except multiprocessing.BufferTooShort as e:
1531 self.assertEqual(e.args, (longmsg,))
1532 else:
1533 self.fail('expected BufferTooShort, got %s' % res)
1534
1535 poll = TimingWrapper(conn.poll)
1536
1537 self.assertEqual(poll(), False)
1538 self.assertTimingAlmostEqual(poll.elapsed, 0)
1539
1540 self.assertEqual(poll(TIMEOUT1), False)
1541 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1542
1543 conn.send(None)
1544
1545 self.assertEqual(poll(TIMEOUT1), True)
1546 self.assertTimingAlmostEqual(poll.elapsed, 0)
1547
1548 self.assertEqual(conn.recv(), None)
1549
1550 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1551 conn.send_bytes(really_big_msg)
1552 self.assertEqual(conn.recv_bytes(), really_big_msg)
1553
1554 conn.send_bytes(SENTINEL) # tell child to quit
1555 child_conn.close()
1556
1557 if self.TYPE == 'processes':
1558 self.assertEqual(conn.readable, True)
1559 self.assertEqual(conn.writable, True)
1560 self.assertRaises(EOFError, conn.recv)
1561 self.assertRaises(EOFError, conn.recv_bytes)
1562
1563 p.join()
1564
1565 def test_duplex_false(self):
1566 reader, writer = self.Pipe(duplex=False)
1567 self.assertEqual(writer.send(1), None)
1568 self.assertEqual(reader.recv(), 1)
1569 if self.TYPE == 'processes':
1570 self.assertEqual(reader.readable, True)
1571 self.assertEqual(reader.writable, False)
1572 self.assertEqual(writer.readable, False)
1573 self.assertEqual(writer.writable, True)
1574 self.assertRaises(IOError, reader.send, 2)
1575 self.assertRaises(IOError, writer.recv)
1576 self.assertRaises(IOError, writer.poll)
1577
1578 def test_spawn_close(self):
1579 # We test that a pipe connection can be closed by parent
1580 # process immediately after child is spawned. On Windows this
1581 # would have sometimes failed on old versions because
1582 # child_conn would be closed before the child got a chance to
1583 # duplicate it.
1584 conn, child_conn = self.Pipe()
1585
1586 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001587 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001588 p.start()
1589 child_conn.close() # this might complete before child initializes
1590
1591 msg = latin('hello')
1592 conn.send_bytes(msg)
1593 self.assertEqual(conn.recv_bytes(), msg)
1594
1595 conn.send_bytes(SENTINEL)
1596 conn.close()
1597 p.join()
1598
1599 def test_sendbytes(self):
1600 if self.TYPE != 'processes':
1601 return
1602
1603 msg = latin('abcdefghijklmnopqrstuvwxyz')
1604 a, b = self.Pipe()
1605
1606 a.send_bytes(msg)
1607 self.assertEqual(b.recv_bytes(), msg)
1608
1609 a.send_bytes(msg, 5)
1610 self.assertEqual(b.recv_bytes(), msg[5:])
1611
1612 a.send_bytes(msg, 7, 8)
1613 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1614
1615 a.send_bytes(msg, 26)
1616 self.assertEqual(b.recv_bytes(), latin(''))
1617
1618 a.send_bytes(msg, 26, 0)
1619 self.assertEqual(b.recv_bytes(), latin(''))
1620
1621 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1622
1623 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1624
1625 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1626
1627 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1628
1629 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1630
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001631 @classmethod
1632 def _is_fd_assigned(cls, fd):
1633 try:
1634 os.fstat(fd)
1635 except OSError as e:
1636 if e.errno == errno.EBADF:
1637 return False
1638 raise
1639 else:
1640 return True
1641
1642 @classmethod
1643 def _writefd(cls, conn, data, create_dummy_fds=False):
1644 if create_dummy_fds:
1645 for i in range(0, 256):
1646 if not cls._is_fd_assigned(i):
1647 os.dup2(conn.fileno(), i)
1648 fd = reduction.recv_handle(conn)
1649 if msvcrt:
1650 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1651 os.write(fd, data)
1652 os.close(fd)
1653
Charles-François Natalibc8f0822011-09-20 20:36:51 +02001654 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001655 def test_fd_transfer(self):
1656 if self.TYPE != 'processes':
1657 self.skipTest("only makes sense with processes")
1658 conn, child_conn = self.Pipe(duplex=True)
1659
1660 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02001661 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001662 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001663 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001664 with open(test.support.TESTFN, "wb") as f:
1665 fd = f.fileno()
1666 if msvcrt:
1667 fd = msvcrt.get_osfhandle(fd)
1668 reduction.send_handle(conn, fd, p.pid)
1669 p.join()
1670 with open(test.support.TESTFN, "rb") as f:
1671 self.assertEqual(f.read(), b"foo")
1672
Charles-François Natalibc8f0822011-09-20 20:36:51 +02001673 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001674 @unittest.skipIf(sys.platform == "win32",
1675 "test semantics don't make sense on Windows")
1676 @unittest.skipIf(MAXFD <= 256,
1677 "largest assignable fd number is too small")
1678 @unittest.skipUnless(hasattr(os, "dup2"),
1679 "test needs os.dup2()")
1680 def test_large_fd_transfer(self):
1681 # With fd > 256 (issue #11657)
1682 if self.TYPE != 'processes':
1683 self.skipTest("only makes sense with processes")
1684 conn, child_conn = self.Pipe(duplex=True)
1685
1686 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02001687 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001688 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001689 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001690 with open(test.support.TESTFN, "wb") as f:
1691 fd = f.fileno()
1692 for newfd in range(256, MAXFD):
1693 if not self._is_fd_assigned(newfd):
1694 break
1695 else:
1696 self.fail("could not find an unassigned large file descriptor")
1697 os.dup2(fd, newfd)
1698 try:
1699 reduction.send_handle(conn, newfd, p.pid)
1700 finally:
1701 os.close(newfd)
1702 p.join()
1703 with open(test.support.TESTFN, "rb") as f:
1704 self.assertEqual(f.read(), b"bar")
1705
Jesus Cea4507e642011-09-21 03:53:25 +02001706 @classmethod
1707 def _send_data_without_fd(self, conn):
1708 os.write(conn.fileno(), b"\0")
1709
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001710 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02001711 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1712 def test_missing_fd_transfer(self):
1713 # Check that exception is raised when received data is not
1714 # accompanied by a file descriptor in ancillary data.
1715 if self.TYPE != 'processes':
1716 self.skipTest("only makes sense with processes")
1717 conn, child_conn = self.Pipe(duplex=True)
1718
1719 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1720 p.daemon = True
1721 p.start()
1722 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1723 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001724
Benjamin Petersone711caf2008-06-11 16:44:04 +00001725class _TestListenerClient(BaseTestCase):
1726
1727 ALLOWED_TYPES = ('processes', 'threads')
1728
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001729 @classmethod
1730 def _test(cls, address):
1731 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001732 conn.send('hello')
1733 conn.close()
1734
1735 def test_listener_client(self):
1736 for family in self.connection.families:
1737 l = self.connection.Listener(family=family)
1738 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001739 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001740 p.start()
1741 conn = l.accept()
1742 self.assertEqual(conn.recv(), 'hello')
1743 p.join()
1744 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001745#
1746# Test of sending connection and socket objects between processes
1747#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001748"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001749class _TestPicklingConnections(BaseTestCase):
1750
1751 ALLOWED_TYPES = ('processes',)
1752
1753 def _listener(self, conn, families):
1754 for fam in families:
1755 l = self.connection.Listener(family=fam)
1756 conn.send(l.address)
1757 new_conn = l.accept()
1758 conn.send(new_conn)
1759
1760 if self.TYPE == 'processes':
1761 l = socket.socket()
1762 l.bind(('localhost', 0))
1763 conn.send(l.getsockname())
1764 l.listen(1)
1765 new_conn, addr = l.accept()
1766 conn.send(new_conn)
1767
1768 conn.recv()
1769
1770 def _remote(self, conn):
1771 for (address, msg) in iter(conn.recv, None):
1772 client = self.connection.Client(address)
1773 client.send(msg.upper())
1774 client.close()
1775
1776 if self.TYPE == 'processes':
1777 address, msg = conn.recv()
1778 client = socket.socket()
1779 client.connect(address)
1780 client.sendall(msg.upper())
1781 client.close()
1782
1783 conn.close()
1784
1785 def test_pickling(self):
1786 try:
1787 multiprocessing.allow_connection_pickling()
1788 except ImportError:
1789 return
1790
1791 families = self.connection.families
1792
1793 lconn, lconn0 = self.Pipe()
1794 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02001795 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001796 lp.start()
1797 lconn0.close()
1798
1799 rconn, rconn0 = self.Pipe()
1800 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001801 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001802 rp.start()
1803 rconn0.close()
1804
1805 for fam in families:
1806 msg = ('This connection uses family %s' % fam).encode('ascii')
1807 address = lconn.recv()
1808 rconn.send((address, msg))
1809 new_conn = lconn.recv()
1810 self.assertEqual(new_conn.recv(), msg.upper())
1811
1812 rconn.send(None)
1813
1814 if self.TYPE == 'processes':
1815 msg = latin('This connection uses a normal socket')
1816 address = lconn.recv()
1817 rconn.send((address, msg))
1818 if hasattr(socket, 'fromfd'):
1819 new_conn = lconn.recv()
1820 self.assertEqual(new_conn.recv(100), msg.upper())
1821 else:
1822 # XXX On Windows with Py2.6 need to backport fromfd()
1823 discard = lconn.recv_bytes()
1824
1825 lconn.send(None)
1826
1827 rconn.close()
1828 lconn.close()
1829
1830 lp.join()
1831 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001832"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001833#
1834#
1835#
1836
1837class _TestHeap(BaseTestCase):
1838
1839 ALLOWED_TYPES = ('processes',)
1840
1841 def test_heap(self):
1842 iterations = 5000
1843 maxblocks = 50
1844 blocks = []
1845
1846 # create and destroy lots of blocks of different sizes
1847 for i in range(iterations):
1848 size = int(random.lognormvariate(0, 1) * 1000)
1849 b = multiprocessing.heap.BufferWrapper(size)
1850 blocks.append(b)
1851 if len(blocks) > maxblocks:
1852 i = random.randrange(maxblocks)
1853 del blocks[i]
1854
1855 # get the heap object
1856 heap = multiprocessing.heap.BufferWrapper._heap
1857
1858 # verify the state of the heap
1859 all = []
1860 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02001861 heap._lock.acquire()
1862 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001863 for L in list(heap._len_to_seq.values()):
1864 for arena, start, stop in L:
1865 all.append((heap._arenas.index(arena), start, stop,
1866 stop-start, 'free'))
1867 for arena, start, stop in heap._allocated_blocks:
1868 all.append((heap._arenas.index(arena), start, stop,
1869 stop-start, 'occupied'))
1870 occupied += (stop-start)
1871
1872 all.sort()
1873
1874 for i in range(len(all)-1):
1875 (arena, start, stop) = all[i][:3]
1876 (narena, nstart, nstop) = all[i+1][:3]
1877 self.assertTrue((arena != narena and nstart == 0) or
1878 (stop == nstart))
1879
Charles-François Natali778db492011-07-02 14:35:49 +02001880 def test_free_from_gc(self):
1881 # Check that freeing of blocks by the garbage collector doesn't deadlock
1882 # (issue #12352).
1883 # Make sure the GC is enabled, and set lower collection thresholds to
1884 # make collections more frequent (and increase the probability of
1885 # deadlock).
1886 if not gc.isenabled():
1887 gc.enable()
1888 self.addCleanup(gc.disable)
1889 thresholds = gc.get_threshold()
1890 self.addCleanup(gc.set_threshold, *thresholds)
1891 gc.set_threshold(10)
1892
1893 # perform numerous block allocations, with cyclic references to make
1894 # sure objects are collected asynchronously by the gc
1895 for i in range(5000):
1896 a = multiprocessing.heap.BufferWrapper(1)
1897 b = multiprocessing.heap.BufferWrapper(1)
1898 # circular references
1899 a.buddy = b
1900 b.buddy = a
1901
Benjamin Petersone711caf2008-06-11 16:44:04 +00001902#
1903#
1904#
1905
Benjamin Petersone711caf2008-06-11 16:44:04 +00001906class _Foo(Structure):
1907 _fields_ = [
1908 ('x', c_int),
1909 ('y', c_double)
1910 ]
1911
1912class _TestSharedCTypes(BaseTestCase):
1913
1914 ALLOWED_TYPES = ('processes',)
1915
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001916 def setUp(self):
1917 if not HAS_SHAREDCTYPES:
1918 self.skipTest("requires multiprocessing.sharedctypes")
1919
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001920 @classmethod
1921 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001922 x.value *= 2
1923 y.value *= 2
1924 foo.x *= 2
1925 foo.y *= 2
1926 string.value *= 2
1927 for i in range(len(arr)):
1928 arr[i] *= 2
1929
1930 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001931 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001932 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001933 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001934 arr = self.Array('d', list(range(10)), lock=lock)
1935 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001936 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001937
1938 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02001939 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001940 p.start()
1941 p.join()
1942
1943 self.assertEqual(x.value, 14)
1944 self.assertAlmostEqual(y.value, 2.0/3.0)
1945 self.assertEqual(foo.x, 6)
1946 self.assertAlmostEqual(foo.y, 4.0)
1947 for i in range(10):
1948 self.assertAlmostEqual(arr[i], i*2)
1949 self.assertEqual(string.value, latin('hellohello'))
1950
1951 def test_synchronize(self):
1952 self.test_sharedctypes(lock=True)
1953
1954 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001955 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001956 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001957 foo.x = 0
1958 foo.y = 0
1959 self.assertEqual(bar.x, 2)
1960 self.assertAlmostEqual(bar.y, 5.0)
1961
1962#
1963#
1964#
1965
1966class _TestFinalize(BaseTestCase):
1967
1968 ALLOWED_TYPES = ('processes',)
1969
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001970 @classmethod
1971 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001972 class Foo(object):
1973 pass
1974
1975 a = Foo()
1976 util.Finalize(a, conn.send, args=('a',))
1977 del a # triggers callback for a
1978
1979 b = Foo()
1980 close_b = util.Finalize(b, conn.send, args=('b',))
1981 close_b() # triggers callback for b
1982 close_b() # does nothing because callback has already been called
1983 del b # does nothing because callback has already been called
1984
1985 c = Foo()
1986 util.Finalize(c, conn.send, args=('c',))
1987
1988 d10 = Foo()
1989 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1990
1991 d01 = Foo()
1992 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1993 d02 = Foo()
1994 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1995 d03 = Foo()
1996 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1997
1998 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1999
2000 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2001
Ezio Melotti13925002011-03-16 11:05:33 +02002002 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002003 # garbage collecting locals
2004 util._exit_function()
2005 conn.close()
2006 os._exit(0)
2007
2008 def test_finalize(self):
2009 conn, child_conn = self.Pipe()
2010
2011 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002012 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002013 p.start()
2014 p.join()
2015
2016 result = [obj for obj in iter(conn.recv, 'STOP')]
2017 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2018
2019#
2020# Test that from ... import * works for each module
2021#
2022
2023class _TestImportStar(BaseTestCase):
2024
2025 ALLOWED_TYPES = ('processes',)
2026
2027 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002028 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002029 'multiprocessing', 'multiprocessing.connection',
2030 'multiprocessing.heap', 'multiprocessing.managers',
2031 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002032 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002033 ]
2034
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002035 if HAS_REDUCTION:
2036 modules.append('multiprocessing.reduction')
2037
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002038 if c_int is not None:
2039 # This module requires _ctypes
2040 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002041
2042 for name in modules:
2043 __import__(name)
2044 mod = sys.modules[name]
2045
2046 for attr in getattr(mod, '__all__', ()):
2047 self.assertTrue(
2048 hasattr(mod, attr),
2049 '%r does not have attribute %r' % (mod, attr)
2050 )
2051
2052#
2053# Quick test that logging works -- does not test logging output
2054#
2055
2056class _TestLogging(BaseTestCase):
2057
2058 ALLOWED_TYPES = ('processes',)
2059
2060 def test_enable_logging(self):
2061 logger = multiprocessing.get_logger()
2062 logger.setLevel(util.SUBWARNING)
2063 self.assertTrue(logger is not None)
2064 logger.debug('this will not be printed')
2065 logger.info('nor will this')
2066 logger.setLevel(LOG_LEVEL)
2067
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002068 @classmethod
2069 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002070 logger = multiprocessing.get_logger()
2071 conn.send(logger.getEffectiveLevel())
2072
2073 def test_level(self):
2074 LEVEL1 = 32
2075 LEVEL2 = 37
2076
2077 logger = multiprocessing.get_logger()
2078 root_logger = logging.getLogger()
2079 root_level = root_logger.level
2080
2081 reader, writer = multiprocessing.Pipe(duplex=False)
2082
2083 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002084 p = self.Process(target=self._test_level, args=(writer,))
2085 p.daemon = True
2086 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002087 self.assertEqual(LEVEL1, reader.recv())
2088
2089 logger.setLevel(logging.NOTSET)
2090 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002091 p = self.Process(target=self._test_level, args=(writer,))
2092 p.daemon = True
2093 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002094 self.assertEqual(LEVEL2, reader.recv())
2095
2096 root_logger.setLevel(root_level)
2097 logger.setLevel(level=LOG_LEVEL)
2098
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002099
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002100# class _TestLoggingProcessName(BaseTestCase):
2101#
2102# def handle(self, record):
2103# assert record.processName == multiprocessing.current_process().name
2104# self.__handled = True
2105#
2106# def test_logging(self):
2107# handler = logging.Handler()
2108# handler.handle = self.handle
2109# self.__handled = False
2110# # Bypass getLogger() and side-effects
2111# logger = logging.getLoggerClass()(
2112# 'multiprocessing.test.TestLoggingProcessName')
2113# logger.addHandler(handler)
2114# logger.propagate = False
2115#
2116# logger.warn('foo')
2117# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002118
Benjamin Petersone711caf2008-06-11 16:44:04 +00002119#
Jesse Noller6214edd2009-01-19 16:23:53 +00002120# Test to verify handle verification, see issue 3321
2121#
2122
2123class TestInvalidHandle(unittest.TestCase):
2124
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002125 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002126 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002127 conn = multiprocessing.connection.Connection(44977608)
2128 try:
2129 self.assertRaises((ValueError, IOError), conn.poll)
2130 finally:
2131 # Hack private attribute _handle to avoid printing an error
2132 # in conn.__del__
2133 conn._handle = None
2134 self.assertRaises((ValueError, IOError),
2135 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002136
Jesse Noller6214edd2009-01-19 16:23:53 +00002137#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002138# Functions used to create test cases from the base ones in this module
2139#
2140
2141def get_attributes(Source, names):
2142 d = {}
2143 for name in names:
2144 obj = getattr(Source, name)
2145 if type(obj) == type(get_attributes):
2146 obj = staticmethod(obj)
2147 d[name] = obj
2148 return d
2149
2150def create_test_cases(Mixin, type):
2151 result = {}
2152 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002153 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002154
2155 for name in list(glob.keys()):
2156 if name.startswith('_Test'):
2157 base = glob[name]
2158 if type in base.ALLOWED_TYPES:
2159 newname = 'With' + Type + name[1:]
2160 class Temp(base, unittest.TestCase, Mixin):
2161 pass
2162 result[newname] = Temp
2163 Temp.__name__ = newname
2164 Temp.__module__ = Mixin.__module__
2165 return result
2166
2167#
2168# Create test cases
2169#
2170
2171class ProcessesMixin(object):
2172 TYPE = 'processes'
2173 Process = multiprocessing.Process
2174 locals().update(get_attributes(multiprocessing, (
2175 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2176 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2177 'RawArray', 'current_process', 'active_children', 'Pipe',
2178 'connection', 'JoinableQueue'
2179 )))
2180
2181testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2182globals().update(testcases_processes)
2183
2184
2185class ManagerMixin(object):
2186 TYPE = 'manager'
2187 Process = multiprocessing.Process
2188 manager = object.__new__(multiprocessing.managers.SyncManager)
2189 locals().update(get_attributes(manager, (
2190 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2191 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2192 'Namespace', 'JoinableQueue'
2193 )))
2194
2195testcases_manager = create_test_cases(ManagerMixin, type='manager')
2196globals().update(testcases_manager)
2197
2198
2199class ThreadsMixin(object):
2200 TYPE = 'threads'
2201 Process = multiprocessing.dummy.Process
2202 locals().update(get_attributes(multiprocessing.dummy, (
2203 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2204 'Condition', 'Event', 'Value', 'Array', 'current_process',
2205 'active_children', 'Pipe', 'connection', 'dict', 'list',
2206 'Namespace', 'JoinableQueue'
2207 )))
2208
2209testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2210globals().update(testcases_threads)
2211
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002212class OtherTest(unittest.TestCase):
2213 # TODO: add more tests for deliver/answer challenge.
2214 def test_deliver_challenge_auth_failure(self):
2215 class _FakeConnection(object):
2216 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002217 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002218 def send_bytes(self, data):
2219 pass
2220 self.assertRaises(multiprocessing.AuthenticationError,
2221 multiprocessing.connection.deliver_challenge,
2222 _FakeConnection(), b'abc')
2223
2224 def test_answer_challenge_auth_failure(self):
2225 class _FakeConnection(object):
2226 def __init__(self):
2227 self.count = 0
2228 def recv_bytes(self, size):
2229 self.count += 1
2230 if self.count == 1:
2231 return multiprocessing.connection.CHALLENGE
2232 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002233 return b'something bogus'
2234 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002235 def send_bytes(self, data):
2236 pass
2237 self.assertRaises(multiprocessing.AuthenticationError,
2238 multiprocessing.connection.answer_challenge,
2239 _FakeConnection(), b'abc')
2240
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002241#
2242# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2243#
2244
2245def initializer(ns):
2246 ns.test += 1
2247
2248class TestInitializers(unittest.TestCase):
2249 def setUp(self):
2250 self.mgr = multiprocessing.Manager()
2251 self.ns = self.mgr.Namespace()
2252 self.ns.test = 0
2253
2254 def tearDown(self):
2255 self.mgr.shutdown()
2256
2257 def test_manager_initializer(self):
2258 m = multiprocessing.managers.SyncManager()
2259 self.assertRaises(TypeError, m.start, 1)
2260 m.start(initializer, (self.ns,))
2261 self.assertEqual(self.ns.test, 1)
2262 m.shutdown()
2263
2264 def test_pool_initializer(self):
2265 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2266 p = multiprocessing.Pool(1, initializer, (self.ns,))
2267 p.close()
2268 p.join()
2269 self.assertEqual(self.ns.test, 1)
2270
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002271#
2272# Issue 5155, 5313, 5331: Test process in processes
2273# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2274#
2275
2276def _ThisSubProcess(q):
2277 try:
2278 item = q.get(block=False)
2279 except pyqueue.Empty:
2280 pass
2281
2282def _TestProcess(q):
2283 queue = multiprocessing.Queue()
2284 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002285 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002286 subProc.start()
2287 subProc.join()
2288
2289def _afunc(x):
2290 return x*x
2291
2292def pool_in_process():
2293 pool = multiprocessing.Pool(processes=4)
2294 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2295
2296class _file_like(object):
2297 def __init__(self, delegate):
2298 self._delegate = delegate
2299 self._pid = None
2300
2301 @property
2302 def cache(self):
2303 pid = os.getpid()
2304 # There are no race conditions since fork keeps only the running thread
2305 if pid != self._pid:
2306 self._pid = pid
2307 self._cache = []
2308 return self._cache
2309
2310 def write(self, data):
2311 self.cache.append(data)
2312
2313 def flush(self):
2314 self._delegate.write(''.join(self.cache))
2315 self._cache = []
2316
2317class TestStdinBadfiledescriptor(unittest.TestCase):
2318
2319 def test_queue_in_process(self):
2320 queue = multiprocessing.Queue()
2321 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2322 proc.start()
2323 proc.join()
2324
2325 def test_pool_in_process(self):
2326 p = multiprocessing.Process(target=pool_in_process)
2327 p.start()
2328 p.join()
2329
2330 def test_flushing(self):
2331 sio = io.StringIO()
2332 flike = _file_like(sio)
2333 flike.write('foo')
2334 proc = multiprocessing.Process(target=lambda: flike.flush())
2335 flike.flush()
2336 assert sio.getvalue() == 'foo'
2337
2338testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2339 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002340
Benjamin Petersone711caf2008-06-11 16:44:04 +00002341#
2342#
2343#
2344
2345def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002346 if sys.platform.startswith("linux"):
2347 try:
2348 lock = multiprocessing.RLock()
2349 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002350 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002351
Benjamin Petersone711caf2008-06-11 16:44:04 +00002352 if run is None:
2353 from test.support import run_unittest as run
2354
2355 util.get_temp_dir() # creates temp directory for use by all processes
2356
2357 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2358
Benjamin Peterson41181742008-07-02 20:22:54 +00002359 ProcessesMixin.pool = multiprocessing.Pool(4)
2360 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2361 ManagerMixin.manager.__init__()
2362 ManagerMixin.manager.start()
2363 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002364
2365 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002366 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2367 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002368 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2369 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002370 )
2371
2372 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2373 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2374 run(suite)
2375
Benjamin Peterson41181742008-07-02 20:22:54 +00002376 ThreadsMixin.pool.terminate()
2377 ProcessesMixin.pool.terminate()
2378 ManagerMixin.pool.terminate()
2379 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002380
Benjamin Peterson41181742008-07-02 20:22:54 +00002381 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002382
2383def main():
2384 test_main(unittest.TextTestRunner(verbosity=2).run)
2385
2386if __name__ == '__main__':
2387 main()