blob: 48a4ff467408b4d24e1b480e690ff65f49347beb [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000027# import threading after _multiprocessing to raise a more revelant error
28# message: "No module named _multiprocessing". _multiprocessing is not compiled
29# without thread support.
30import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.dummy
33import multiprocessing.connection
34import multiprocessing.managers
35import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
Charles-François Natalibc8f0822011-09-20 20:36:51 +020038from multiprocessing import util
39
40try:
41 from multiprocessing import reduction
42 HAS_REDUCTION = True
43except ImportError:
44 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000045
Brian Curtinafa88b52010-10-07 01:12:19 +000046try:
47 from multiprocessing.sharedctypes import Value, copy
48 HAS_SHAREDCTYPES = True
49except ImportError:
50 HAS_SHAREDCTYPES = False
51
Antoine Pitroubcb39d42011-08-23 19:46:22 +020052try:
53 import msvcrt
54except ImportError:
55 msvcrt = None
56
Benjamin Petersone711caf2008-06-11 16:44:04 +000057#
58#
59#
60
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000061def latin(s):
62 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000063
Benjamin Petersone711caf2008-06-11 16:44:04 +000064#
65# Constants
66#
67
68LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000069#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000070
71DELTA = 0.1
72CHECK_TIMINGS = False # making true makes tests take a lot longer
73 # and can sometimes cause some non-serious
74 # failures because some calls block a bit
75 # longer than expected
76if CHECK_TIMINGS:
77 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
78else:
79 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
80
81HAVE_GETVALUE = not getattr(_multiprocessing,
82 'HAVE_BROKEN_SEM_GETVALUE', False)
83
Jesse Noller6214edd2009-01-19 16:23:53 +000084WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020085if WIN32:
86 from _subprocess import WaitForSingleObject, INFINITE, WAIT_OBJECT_0
87
88 def wait_for_handle(handle, timeout):
89 if timeout is None or timeout < 0.0:
90 timeout = INFINITE
91 else:
92 timeout = int(1000 * timeout)
93 return WaitForSingleObject(handle, timeout) == WAIT_OBJECT_0
94else:
95 from select import select
96 _select = util._eintr_retry(select)
97
98 def wait_for_handle(handle, timeout):
99 if timeout is not None and timeout < 0.0:
100 timeout = None
101 return handle in _select([handle], [], [], timeout)[0]
Jesse Noller6214edd2009-01-19 16:23:53 +0000102
Antoine Pitroubcb39d42011-08-23 19:46:22 +0200103try:
104 MAXFD = os.sysconf("SC_OPEN_MAX")
105except:
106 MAXFD = 256
107
Benjamin Petersone711caf2008-06-11 16:44:04 +0000108#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000109# Some tests require ctypes
110#
111
112try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000113 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000114except ImportError:
115 Structure = object
116 c_int = c_double = None
117
118#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000119# Creates a wrapper for a function which records the time it takes to finish
120#
121
122class TimingWrapper(object):
123
124 def __init__(self, func):
125 self.func = func
126 self.elapsed = None
127
128 def __call__(self, *args, **kwds):
129 t = time.time()
130 try:
131 return self.func(*args, **kwds)
132 finally:
133 self.elapsed = time.time() - t
134
135#
136# Base class for test cases
137#
138
139class BaseTestCase(object):
140
141 ALLOWED_TYPES = ('processes', 'manager', 'threads')
142
143 def assertTimingAlmostEqual(self, a, b):
144 if CHECK_TIMINGS:
145 self.assertAlmostEqual(a, b, 1)
146
147 def assertReturnsIfImplemented(self, value, func, *args):
148 try:
149 res = func(*args)
150 except NotImplementedError:
151 pass
152 else:
153 return self.assertEqual(value, res)
154
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000155 # For the sanity of Windows users, rather than crashing or freezing in
156 # multiple ways.
157 def __reduce__(self, *args):
158 raise NotImplementedError("shouldn't try to pickle a test case")
159
160 __reduce_ex__ = __reduce__
161
Benjamin Petersone711caf2008-06-11 16:44:04 +0000162#
163# Return the value of a semaphore
164#
165
166def get_value(self):
167 try:
168 return self.get_value()
169 except AttributeError:
170 try:
171 return self._Semaphore__value
172 except AttributeError:
173 try:
174 return self._value
175 except AttributeError:
176 raise NotImplementedError
177
178#
179# Testcases
180#
181
182class _TestProcess(BaseTestCase):
183
184 ALLOWED_TYPES = ('processes', 'threads')
185
186 def test_current(self):
187 if self.TYPE == 'threads':
188 return
189
190 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000191 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000192
193 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000194 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000195 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000196 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000197 self.assertEqual(current.ident, os.getpid())
198 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000199
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000200 def test_daemon_argument(self):
201 if self.TYPE == "threads":
202 return
203
204 # By default uses the current process's daemon flag.
205 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000206 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000207 proc1 = self.Process(target=self._test, daemon=True)
208 self.assertTrue(proc1.daemon)
209 proc2 = self.Process(target=self._test, daemon=False)
210 self.assertFalse(proc2.daemon)
211
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000212 @classmethod
213 def _test(cls, q, *args, **kwds):
214 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000215 q.put(args)
216 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000217 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000218 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000219 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000220 q.put(current.pid)
221
222 def test_process(self):
223 q = self.Queue(1)
224 e = self.Event()
225 args = (q, 1, 2)
226 kwargs = {'hello':23, 'bye':2.54}
227 name = 'SomeProcess'
228 p = self.Process(
229 target=self._test, args=args, kwargs=kwargs, name=name
230 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000231 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000232 current = self.current_process()
233
234 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000235 self.assertEqual(p.authkey, current.authkey)
236 self.assertEqual(p.is_alive(), False)
237 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000238 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000239 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000240 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000241
242 p.start()
243
Ezio Melottib3aedd42010-11-20 19:04:17 +0000244 self.assertEqual(p.exitcode, None)
245 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000246 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000247
Ezio Melottib3aedd42010-11-20 19:04:17 +0000248 self.assertEqual(q.get(), args[1:])
249 self.assertEqual(q.get(), kwargs)
250 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000251 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000252 self.assertEqual(q.get(), current.authkey)
253 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000254
255 p.join()
256
Ezio Melottib3aedd42010-11-20 19:04:17 +0000257 self.assertEqual(p.exitcode, 0)
258 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000259 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000260
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000261 @classmethod
262 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000263 time.sleep(1000)
264
265 def test_terminate(self):
266 if self.TYPE == 'threads':
267 return
268
269 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000270 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000271 p.start()
272
273 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000274 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000275 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000276
277 p.terminate()
278
279 join = TimingWrapper(p.join)
280 self.assertEqual(join(), None)
281 self.assertTimingAlmostEqual(join.elapsed, 0.0)
282
283 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000284 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000285
286 p.join()
287
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000288 # XXX sometimes get p.exitcode == 0 on Windows ...
289 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000290
291 def test_cpu_count(self):
292 try:
293 cpus = multiprocessing.cpu_count()
294 except NotImplementedError:
295 cpus = 1
296 self.assertTrue(type(cpus) is int)
297 self.assertTrue(cpus >= 1)
298
299 def test_active_children(self):
300 self.assertEqual(type(self.active_children()), list)
301
302 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000303 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000304
Jesus Cea94f964f2011-09-09 20:26:57 +0200305 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000306 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000307 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000308
309 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000310 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000311
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000312 @classmethod
313 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000314 from multiprocessing import forking
315 wconn.send(id)
316 if len(id) < 2:
317 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000318 p = cls.Process(
319 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000320 )
321 p.start()
322 p.join()
323
324 def test_recursion(self):
325 rconn, wconn = self.Pipe(duplex=False)
326 self._test_recursion(wconn, [])
327
328 time.sleep(DELTA)
329 result = []
330 while rconn.poll():
331 result.append(rconn.recv())
332
333 expected = [
334 [],
335 [0],
336 [0, 0],
337 [0, 1],
338 [1],
339 [1, 0],
340 [1, 1]
341 ]
342 self.assertEqual(result, expected)
343
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200344 @classmethod
345 def _test_sentinel(cls, event):
346 event.wait(10.0)
347
348 def test_sentinel(self):
349 if self.TYPE == "threads":
350 return
351 event = self.Event()
352 p = self.Process(target=self._test_sentinel, args=(event,))
353 with self.assertRaises(ValueError):
354 p.sentinel
355 p.start()
356 self.addCleanup(p.join)
357 sentinel = p.sentinel
358 self.assertIsInstance(sentinel, int)
359 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
360 event.set()
361 p.join()
362 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
363
Benjamin Petersone711caf2008-06-11 16:44:04 +0000364#
365#
366#
367
368class _UpperCaser(multiprocessing.Process):
369
370 def __init__(self):
371 multiprocessing.Process.__init__(self)
372 self.child_conn, self.parent_conn = multiprocessing.Pipe()
373
374 def run(self):
375 self.parent_conn.close()
376 for s in iter(self.child_conn.recv, None):
377 self.child_conn.send(s.upper())
378 self.child_conn.close()
379
380 def submit(self, s):
381 assert type(s) is str
382 self.parent_conn.send(s)
383 return self.parent_conn.recv()
384
385 def stop(self):
386 self.parent_conn.send(None)
387 self.parent_conn.close()
388 self.child_conn.close()
389
390class _TestSubclassingProcess(BaseTestCase):
391
392 ALLOWED_TYPES = ('processes',)
393
394 def test_subclassing(self):
395 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200396 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000397 uppercaser.start()
398 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
399 self.assertEqual(uppercaser.submit('world'), 'WORLD')
400 uppercaser.stop()
401 uppercaser.join()
402
403#
404#
405#
406
407def queue_empty(q):
408 if hasattr(q, 'empty'):
409 return q.empty()
410 else:
411 return q.qsize() == 0
412
413def queue_full(q, maxsize):
414 if hasattr(q, 'full'):
415 return q.full()
416 else:
417 return q.qsize() == maxsize
418
419
420class _TestQueue(BaseTestCase):
421
422
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000423 @classmethod
424 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000425 child_can_start.wait()
426 for i in range(6):
427 queue.get()
428 parent_can_continue.set()
429
430 def test_put(self):
431 MAXSIZE = 6
432 queue = self.Queue(maxsize=MAXSIZE)
433 child_can_start = self.Event()
434 parent_can_continue = self.Event()
435
436 proc = self.Process(
437 target=self._test_put,
438 args=(queue, child_can_start, parent_can_continue)
439 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000440 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000441 proc.start()
442
443 self.assertEqual(queue_empty(queue), True)
444 self.assertEqual(queue_full(queue, MAXSIZE), False)
445
446 queue.put(1)
447 queue.put(2, True)
448 queue.put(3, True, None)
449 queue.put(4, False)
450 queue.put(5, False, None)
451 queue.put_nowait(6)
452
453 # the values may be in buffer but not yet in pipe so sleep a bit
454 time.sleep(DELTA)
455
456 self.assertEqual(queue_empty(queue), False)
457 self.assertEqual(queue_full(queue, MAXSIZE), True)
458
459 put = TimingWrapper(queue.put)
460 put_nowait = TimingWrapper(queue.put_nowait)
461
462 self.assertRaises(pyqueue.Full, put, 7, False)
463 self.assertTimingAlmostEqual(put.elapsed, 0)
464
465 self.assertRaises(pyqueue.Full, put, 7, False, None)
466 self.assertTimingAlmostEqual(put.elapsed, 0)
467
468 self.assertRaises(pyqueue.Full, put_nowait, 7)
469 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
470
471 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
472 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
473
474 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
475 self.assertTimingAlmostEqual(put.elapsed, 0)
476
477 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
478 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
479
480 child_can_start.set()
481 parent_can_continue.wait()
482
483 self.assertEqual(queue_empty(queue), True)
484 self.assertEqual(queue_full(queue, MAXSIZE), False)
485
486 proc.join()
487
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000488 @classmethod
489 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000490 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000491 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000492 queue.put(2)
493 queue.put(3)
494 queue.put(4)
495 queue.put(5)
496 parent_can_continue.set()
497
498 def test_get(self):
499 queue = self.Queue()
500 child_can_start = self.Event()
501 parent_can_continue = self.Event()
502
503 proc = self.Process(
504 target=self._test_get,
505 args=(queue, child_can_start, parent_can_continue)
506 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000507 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000508 proc.start()
509
510 self.assertEqual(queue_empty(queue), True)
511
512 child_can_start.set()
513 parent_can_continue.wait()
514
515 time.sleep(DELTA)
516 self.assertEqual(queue_empty(queue), False)
517
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000518 # Hangs unexpectedly, remove for now
519 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000520 self.assertEqual(queue.get(True, None), 2)
521 self.assertEqual(queue.get(True), 3)
522 self.assertEqual(queue.get(timeout=1), 4)
523 self.assertEqual(queue.get_nowait(), 5)
524
525 self.assertEqual(queue_empty(queue), True)
526
527 get = TimingWrapper(queue.get)
528 get_nowait = TimingWrapper(queue.get_nowait)
529
530 self.assertRaises(pyqueue.Empty, get, False)
531 self.assertTimingAlmostEqual(get.elapsed, 0)
532
533 self.assertRaises(pyqueue.Empty, get, False, None)
534 self.assertTimingAlmostEqual(get.elapsed, 0)
535
536 self.assertRaises(pyqueue.Empty, get_nowait)
537 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
538
539 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
540 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
541
542 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
543 self.assertTimingAlmostEqual(get.elapsed, 0)
544
545 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
546 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
547
548 proc.join()
549
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000550 @classmethod
551 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000552 for i in range(10, 20):
553 queue.put(i)
554 # note that at this point the items may only be buffered, so the
555 # process cannot shutdown until the feeder thread has finished
556 # pushing items onto the pipe.
557
558 def test_fork(self):
559 # Old versions of Queue would fail to create a new feeder
560 # thread for a forked process if the original process had its
561 # own feeder thread. This test checks that this no longer
562 # happens.
563
564 queue = self.Queue()
565
566 # put items on queue so that main process starts a feeder thread
567 for i in range(10):
568 queue.put(i)
569
570 # wait to make sure thread starts before we fork a new process
571 time.sleep(DELTA)
572
573 # fork process
574 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200575 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000576 p.start()
577
578 # check that all expected items are in the queue
579 for i in range(20):
580 self.assertEqual(queue.get(), i)
581 self.assertRaises(pyqueue.Empty, queue.get, False)
582
583 p.join()
584
585 def test_qsize(self):
586 q = self.Queue()
587 try:
588 self.assertEqual(q.qsize(), 0)
589 except NotImplementedError:
590 return
591 q.put(1)
592 self.assertEqual(q.qsize(), 1)
593 q.put(5)
594 self.assertEqual(q.qsize(), 2)
595 q.get()
596 self.assertEqual(q.qsize(), 1)
597 q.get()
598 self.assertEqual(q.qsize(), 0)
599
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000600 @classmethod
601 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000602 for obj in iter(q.get, None):
603 time.sleep(DELTA)
604 q.task_done()
605
606 def test_task_done(self):
607 queue = self.JoinableQueue()
608
609 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000610 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000611
612 workers = [self.Process(target=self._test_task_done, args=(queue,))
613 for i in range(4)]
614
615 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200616 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000617 p.start()
618
619 for i in range(10):
620 queue.put(i)
621
622 queue.join()
623
624 for p in workers:
625 queue.put(None)
626
627 for p in workers:
628 p.join()
629
630#
631#
632#
633
634class _TestLock(BaseTestCase):
635
636 def test_lock(self):
637 lock = self.Lock()
638 self.assertEqual(lock.acquire(), True)
639 self.assertEqual(lock.acquire(False), False)
640 self.assertEqual(lock.release(), None)
641 self.assertRaises((ValueError, threading.ThreadError), lock.release)
642
643 def test_rlock(self):
644 lock = self.RLock()
645 self.assertEqual(lock.acquire(), True)
646 self.assertEqual(lock.acquire(), True)
647 self.assertEqual(lock.acquire(), True)
648 self.assertEqual(lock.release(), None)
649 self.assertEqual(lock.release(), None)
650 self.assertEqual(lock.release(), None)
651 self.assertRaises((AssertionError, RuntimeError), lock.release)
652
Jesse Nollerf8d00852009-03-31 03:25:07 +0000653 def test_lock_context(self):
654 with self.Lock():
655 pass
656
Benjamin Petersone711caf2008-06-11 16:44:04 +0000657
658class _TestSemaphore(BaseTestCase):
659
660 def _test_semaphore(self, sem):
661 self.assertReturnsIfImplemented(2, get_value, sem)
662 self.assertEqual(sem.acquire(), True)
663 self.assertReturnsIfImplemented(1, get_value, sem)
664 self.assertEqual(sem.acquire(), True)
665 self.assertReturnsIfImplemented(0, get_value, sem)
666 self.assertEqual(sem.acquire(False), False)
667 self.assertReturnsIfImplemented(0, get_value, sem)
668 self.assertEqual(sem.release(), None)
669 self.assertReturnsIfImplemented(1, get_value, sem)
670 self.assertEqual(sem.release(), None)
671 self.assertReturnsIfImplemented(2, get_value, sem)
672
673 def test_semaphore(self):
674 sem = self.Semaphore(2)
675 self._test_semaphore(sem)
676 self.assertEqual(sem.release(), None)
677 self.assertReturnsIfImplemented(3, get_value, sem)
678 self.assertEqual(sem.release(), None)
679 self.assertReturnsIfImplemented(4, get_value, sem)
680
681 def test_bounded_semaphore(self):
682 sem = self.BoundedSemaphore(2)
683 self._test_semaphore(sem)
684 # Currently fails on OS/X
685 #if HAVE_GETVALUE:
686 # self.assertRaises(ValueError, sem.release)
687 # self.assertReturnsIfImplemented(2, get_value, sem)
688
689 def test_timeout(self):
690 if self.TYPE != 'processes':
691 return
692
693 sem = self.Semaphore(0)
694 acquire = TimingWrapper(sem.acquire)
695
696 self.assertEqual(acquire(False), False)
697 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
698
699 self.assertEqual(acquire(False, None), False)
700 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
701
702 self.assertEqual(acquire(False, TIMEOUT1), False)
703 self.assertTimingAlmostEqual(acquire.elapsed, 0)
704
705 self.assertEqual(acquire(True, TIMEOUT2), False)
706 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
707
708 self.assertEqual(acquire(timeout=TIMEOUT3), False)
709 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
710
711
712class _TestCondition(BaseTestCase):
713
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000714 @classmethod
715 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000716 cond.acquire()
717 sleeping.release()
718 cond.wait(timeout)
719 woken.release()
720 cond.release()
721
722 def check_invariant(self, cond):
723 # this is only supposed to succeed when there are no sleepers
724 if self.TYPE == 'processes':
725 try:
726 sleepers = (cond._sleeping_count.get_value() -
727 cond._woken_count.get_value())
728 self.assertEqual(sleepers, 0)
729 self.assertEqual(cond._wait_semaphore.get_value(), 0)
730 except NotImplementedError:
731 pass
732
733 def test_notify(self):
734 cond = self.Condition()
735 sleeping = self.Semaphore(0)
736 woken = self.Semaphore(0)
737
738 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000739 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000740 p.start()
741
742 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000743 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000744 p.start()
745
746 # wait for both children to start sleeping
747 sleeping.acquire()
748 sleeping.acquire()
749
750 # check no process/thread has woken up
751 time.sleep(DELTA)
752 self.assertReturnsIfImplemented(0, get_value, woken)
753
754 # wake up one process/thread
755 cond.acquire()
756 cond.notify()
757 cond.release()
758
759 # check one process/thread has woken up
760 time.sleep(DELTA)
761 self.assertReturnsIfImplemented(1, get_value, woken)
762
763 # wake up another
764 cond.acquire()
765 cond.notify()
766 cond.release()
767
768 # check other has woken up
769 time.sleep(DELTA)
770 self.assertReturnsIfImplemented(2, get_value, woken)
771
772 # check state is not mucked up
773 self.check_invariant(cond)
774 p.join()
775
776 def test_notify_all(self):
777 cond = self.Condition()
778 sleeping = self.Semaphore(0)
779 woken = self.Semaphore(0)
780
781 # start some threads/processes which will timeout
782 for i in range(3):
783 p = self.Process(target=self.f,
784 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000785 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000786 p.start()
787
788 t = threading.Thread(target=self.f,
789 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000790 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000791 t.start()
792
793 # wait for them all to sleep
794 for i in range(6):
795 sleeping.acquire()
796
797 # check they have all timed out
798 for i in range(6):
799 woken.acquire()
800 self.assertReturnsIfImplemented(0, get_value, woken)
801
802 # check state is not mucked up
803 self.check_invariant(cond)
804
805 # start some more threads/processes
806 for i in range(3):
807 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000808 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000809 p.start()
810
811 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000812 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000813 t.start()
814
815 # wait for them to all sleep
816 for i in range(6):
817 sleeping.acquire()
818
819 # check no process/thread has woken up
820 time.sleep(DELTA)
821 self.assertReturnsIfImplemented(0, get_value, woken)
822
823 # wake them all up
824 cond.acquire()
825 cond.notify_all()
826 cond.release()
827
828 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200829 for i in range(10):
830 try:
831 if get_value(woken) == 6:
832 break
833 except NotImplementedError:
834 break
835 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000836 self.assertReturnsIfImplemented(6, get_value, woken)
837
838 # check state is not mucked up
839 self.check_invariant(cond)
840
841 def test_timeout(self):
842 cond = self.Condition()
843 wait = TimingWrapper(cond.wait)
844 cond.acquire()
845 res = wait(TIMEOUT1)
846 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000847 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000848 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
849
850
851class _TestEvent(BaseTestCase):
852
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000853 @classmethod
854 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000855 time.sleep(TIMEOUT2)
856 event.set()
857
858 def test_event(self):
859 event = self.Event()
860 wait = TimingWrapper(event.wait)
861
Ezio Melotti13925002011-03-16 11:05:33 +0200862 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000863 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000864 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000865
Benjamin Peterson965ce872009-04-05 21:24:58 +0000866 # Removed, threading.Event.wait() will return the value of the __flag
867 # instead of None. API Shear with the semaphore backed mp.Event
868 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000869 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000870 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000871 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
872
873 event.set()
874
875 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000876 self.assertEqual(event.is_set(), True)
877 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000878 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000879 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000880 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
881 # self.assertEqual(event.is_set(), True)
882
883 event.clear()
884
885 #self.assertEqual(event.is_set(), False)
886
Jesus Cea94f964f2011-09-09 20:26:57 +0200887 p = self.Process(target=self._test_event, args=(event,))
888 p.daemon = True
889 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000890 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000891
892#
893#
894#
895
896class _TestValue(BaseTestCase):
897
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000898 ALLOWED_TYPES = ('processes',)
899
Benjamin Petersone711caf2008-06-11 16:44:04 +0000900 codes_values = [
901 ('i', 4343, 24234),
902 ('d', 3.625, -4.25),
903 ('h', -232, 234),
904 ('c', latin('x'), latin('y'))
905 ]
906
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000907 def setUp(self):
908 if not HAS_SHAREDCTYPES:
909 self.skipTest("requires multiprocessing.sharedctypes")
910
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000911 @classmethod
912 def _test(cls, values):
913 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000914 sv.value = cv[2]
915
916
917 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000918 if raw:
919 values = [self.RawValue(code, value)
920 for code, value, _ in self.codes_values]
921 else:
922 values = [self.Value(code, value)
923 for code, value, _ in self.codes_values]
924
925 for sv, cv in zip(values, self.codes_values):
926 self.assertEqual(sv.value, cv[1])
927
928 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200929 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000930 proc.start()
931 proc.join()
932
933 for sv, cv in zip(values, self.codes_values):
934 self.assertEqual(sv.value, cv[2])
935
936 def test_rawvalue(self):
937 self.test_value(raw=True)
938
939 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000940 val1 = self.Value('i', 5)
941 lock1 = val1.get_lock()
942 obj1 = val1.get_obj()
943
944 val2 = self.Value('i', 5, lock=None)
945 lock2 = val2.get_lock()
946 obj2 = val2.get_obj()
947
948 lock = self.Lock()
949 val3 = self.Value('i', 5, lock=lock)
950 lock3 = val3.get_lock()
951 obj3 = val3.get_obj()
952 self.assertEqual(lock, lock3)
953
Jesse Nollerb0516a62009-01-18 03:11:38 +0000954 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000955 self.assertFalse(hasattr(arr4, 'get_lock'))
956 self.assertFalse(hasattr(arr4, 'get_obj'))
957
Jesse Nollerb0516a62009-01-18 03:11:38 +0000958 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
959
960 arr5 = self.RawValue('i', 5)
961 self.assertFalse(hasattr(arr5, 'get_lock'))
962 self.assertFalse(hasattr(arr5, 'get_obj'))
963
Benjamin Petersone711caf2008-06-11 16:44:04 +0000964
965class _TestArray(BaseTestCase):
966
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000967 ALLOWED_TYPES = ('processes',)
968
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000969 @classmethod
970 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000971 for i in range(1, len(seq)):
972 seq[i] += seq[i-1]
973
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000974 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000975 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000976 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
977 if raw:
978 arr = self.RawArray('i', seq)
979 else:
980 arr = self.Array('i', seq)
981
982 self.assertEqual(len(arr), len(seq))
983 self.assertEqual(arr[3], seq[3])
984 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
985
986 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
987
988 self.assertEqual(list(arr[:]), seq)
989
990 self.f(seq)
991
992 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200993 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000994 p.start()
995 p.join()
996
997 self.assertEqual(list(arr[:]), seq)
998
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000999 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001000 def test_array_from_size(self):
1001 size = 10
1002 # Test for zeroing (see issue #11675).
1003 # The repetition below strengthens the test by increasing the chances
1004 # of previously allocated non-zero memory being used for the new array
1005 # on the 2nd and 3rd loops.
1006 for _ in range(3):
1007 arr = self.Array('i', size)
1008 self.assertEqual(len(arr), size)
1009 self.assertEqual(list(arr), [0] * size)
1010 arr[:] = range(10)
1011 self.assertEqual(list(arr), list(range(10)))
1012 del arr
1013
1014 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001015 def test_rawarray(self):
1016 self.test_array(raw=True)
1017
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001018 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001019 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001020 arr1 = self.Array('i', list(range(10)))
1021 lock1 = arr1.get_lock()
1022 obj1 = arr1.get_obj()
1023
1024 arr2 = self.Array('i', list(range(10)), lock=None)
1025 lock2 = arr2.get_lock()
1026 obj2 = arr2.get_obj()
1027
1028 lock = self.Lock()
1029 arr3 = self.Array('i', list(range(10)), lock=lock)
1030 lock3 = arr3.get_lock()
1031 obj3 = arr3.get_obj()
1032 self.assertEqual(lock, lock3)
1033
Jesse Nollerb0516a62009-01-18 03:11:38 +00001034 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001035 self.assertFalse(hasattr(arr4, 'get_lock'))
1036 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001037 self.assertRaises(AttributeError,
1038 self.Array, 'i', range(10), lock='notalock')
1039
1040 arr5 = self.RawArray('i', range(10))
1041 self.assertFalse(hasattr(arr5, 'get_lock'))
1042 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001043
1044#
1045#
1046#
1047
1048class _TestContainers(BaseTestCase):
1049
1050 ALLOWED_TYPES = ('manager',)
1051
1052 def test_list(self):
1053 a = self.list(list(range(10)))
1054 self.assertEqual(a[:], list(range(10)))
1055
1056 b = self.list()
1057 self.assertEqual(b[:], [])
1058
1059 b.extend(list(range(5)))
1060 self.assertEqual(b[:], list(range(5)))
1061
1062 self.assertEqual(b[2], 2)
1063 self.assertEqual(b[2:10], [2,3,4])
1064
1065 b *= 2
1066 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1067
1068 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1069
1070 self.assertEqual(a[:], list(range(10)))
1071
1072 d = [a, b]
1073 e = self.list(d)
1074 self.assertEqual(
1075 e[:],
1076 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1077 )
1078
1079 f = self.list([a])
1080 a.append('hello')
1081 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1082
1083 def test_dict(self):
1084 d = self.dict()
1085 indices = list(range(65, 70))
1086 for i in indices:
1087 d[i] = chr(i)
1088 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1089 self.assertEqual(sorted(d.keys()), indices)
1090 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1091 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1092
1093 def test_namespace(self):
1094 n = self.Namespace()
1095 n.name = 'Bob'
1096 n.job = 'Builder'
1097 n._hidden = 'hidden'
1098 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1099 del n.job
1100 self.assertEqual(str(n), "Namespace(name='Bob')")
1101 self.assertTrue(hasattr(n, 'name'))
1102 self.assertTrue(not hasattr(n, 'job'))
1103
1104#
1105#
1106#
1107
1108def sqr(x, wait=0.0):
1109 time.sleep(wait)
1110 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001111
Benjamin Petersone711caf2008-06-11 16:44:04 +00001112class _TestPool(BaseTestCase):
1113
1114 def test_apply(self):
1115 papply = self.pool.apply
1116 self.assertEqual(papply(sqr, (5,)), sqr(5))
1117 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1118
1119 def test_map(self):
1120 pmap = self.pool.map
1121 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1122 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1123 list(map(sqr, list(range(100)))))
1124
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001125 def test_map_chunksize(self):
1126 try:
1127 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1128 except multiprocessing.TimeoutError:
1129 self.fail("pool.map_async with chunksize stalled on null list")
1130
Benjamin Petersone711caf2008-06-11 16:44:04 +00001131 def test_async(self):
1132 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1133 get = TimingWrapper(res.get)
1134 self.assertEqual(get(), 49)
1135 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1136
1137 def test_async_timeout(self):
1138 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1139 get = TimingWrapper(res.get)
1140 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1141 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1142
1143 def test_imap(self):
1144 it = self.pool.imap(sqr, list(range(10)))
1145 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1146
1147 it = self.pool.imap(sqr, list(range(10)))
1148 for i in range(10):
1149 self.assertEqual(next(it), i*i)
1150 self.assertRaises(StopIteration, it.__next__)
1151
1152 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1153 for i in range(1000):
1154 self.assertEqual(next(it), i*i)
1155 self.assertRaises(StopIteration, it.__next__)
1156
1157 def test_imap_unordered(self):
1158 it = self.pool.imap_unordered(sqr, list(range(1000)))
1159 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1160
1161 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1162 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1163
1164 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001165 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1166 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1167
Benjamin Petersone711caf2008-06-11 16:44:04 +00001168 p = multiprocessing.Pool(3)
1169 self.assertEqual(3, len(p._pool))
1170 p.close()
1171 p.join()
1172
1173 def test_terminate(self):
1174 if self.TYPE == 'manager':
1175 # On Unix a forked process increfs each shared object to
1176 # which its parent process held a reference. If the
1177 # forked process gets terminated then there is likely to
1178 # be a reference leak. So to prevent
1179 # _TestZZZNumberOfObjects from failing we skip this test
1180 # when using a manager.
1181 return
1182
1183 result = self.pool.map_async(
1184 time.sleep, [0.1 for i in range(10000)], chunksize=1
1185 )
1186 self.pool.terminate()
1187 join = TimingWrapper(self.pool.join)
1188 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001189 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001190
Ask Solem2afcbf22010-11-09 20:55:52 +00001191def raising():
1192 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001193
Ask Solem2afcbf22010-11-09 20:55:52 +00001194def unpickleable_result():
1195 return lambda: 42
1196
1197class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001198 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001199
1200 def test_async_error_callback(self):
1201 p = multiprocessing.Pool(2)
1202
1203 scratchpad = [None]
1204 def errback(exc):
1205 scratchpad[0] = exc
1206
1207 res = p.apply_async(raising, error_callback=errback)
1208 self.assertRaises(KeyError, res.get)
1209 self.assertTrue(scratchpad[0])
1210 self.assertIsInstance(scratchpad[0], KeyError)
1211
1212 p.close()
1213 p.join()
1214
1215 def test_unpickleable_result(self):
1216 from multiprocessing.pool import MaybeEncodingError
1217 p = multiprocessing.Pool(2)
1218
1219 # Make sure we don't lose pool processes because of encoding errors.
1220 for iteration in range(20):
1221
1222 scratchpad = [None]
1223 def errback(exc):
1224 scratchpad[0] = exc
1225
1226 res = p.apply_async(unpickleable_result, error_callback=errback)
1227 self.assertRaises(MaybeEncodingError, res.get)
1228 wrapped = scratchpad[0]
1229 self.assertTrue(wrapped)
1230 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1231 self.assertIsNotNone(wrapped.exc)
1232 self.assertIsNotNone(wrapped.value)
1233
1234 p.close()
1235 p.join()
1236
1237class _TestPoolWorkerLifetime(BaseTestCase):
1238 ALLOWED_TYPES = ('processes', )
1239
Jesse Noller1f0b6582010-01-27 03:36:01 +00001240 def test_pool_worker_lifetime(self):
1241 p = multiprocessing.Pool(3, maxtasksperchild=10)
1242 self.assertEqual(3, len(p._pool))
1243 origworkerpids = [w.pid for w in p._pool]
1244 # Run many tasks so each worker gets replaced (hopefully)
1245 results = []
1246 for i in range(100):
1247 results.append(p.apply_async(sqr, (i, )))
1248 # Fetch the results and verify we got the right answers,
1249 # also ensuring all the tasks have completed.
1250 for (j, res) in enumerate(results):
1251 self.assertEqual(res.get(), sqr(j))
1252 # Refill the pool
1253 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001254 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001255 # (countdown * DELTA = 5 seconds max startup process time)
1256 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001257 while countdown and not all(w.is_alive() for w in p._pool):
1258 countdown -= 1
1259 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001260 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001261 # All pids should be assigned. See issue #7805.
1262 self.assertNotIn(None, origworkerpids)
1263 self.assertNotIn(None, finalworkerpids)
1264 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001265 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1266 p.close()
1267 p.join()
1268
Benjamin Petersone711caf2008-06-11 16:44:04 +00001269#
1270# Test that manager has expected number of shared objects left
1271#
1272
1273class _TestZZZNumberOfObjects(BaseTestCase):
1274 # Because test cases are sorted alphabetically, this one will get
1275 # run after all the other tests for the manager. It tests that
1276 # there have been no "reference leaks" for the manager's shared
1277 # objects. Note the comment in _TestPool.test_terminate().
1278 ALLOWED_TYPES = ('manager',)
1279
1280 def test_number_of_objects(self):
1281 EXPECTED_NUMBER = 1 # the pool object is still alive
1282 multiprocessing.active_children() # discard dead process objs
1283 gc.collect() # do garbage collection
1284 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001285 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001286 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001287 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001288 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001289
1290 self.assertEqual(refs, EXPECTED_NUMBER)
1291
1292#
1293# Test of creating a customized manager class
1294#
1295
1296from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1297
1298class FooBar(object):
1299 def f(self):
1300 return 'f()'
1301 def g(self):
1302 raise ValueError
1303 def _h(self):
1304 return '_h()'
1305
1306def baz():
1307 for i in range(10):
1308 yield i*i
1309
1310class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001311 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001312 def __iter__(self):
1313 return self
1314 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001315 return self._callmethod('__next__')
1316
1317class MyManager(BaseManager):
1318 pass
1319
1320MyManager.register('Foo', callable=FooBar)
1321MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1322MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1323
1324
1325class _TestMyManager(BaseTestCase):
1326
1327 ALLOWED_TYPES = ('manager',)
1328
1329 def test_mymanager(self):
1330 manager = MyManager()
1331 manager.start()
1332
1333 foo = manager.Foo()
1334 bar = manager.Bar()
1335 baz = manager.baz()
1336
1337 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1338 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1339
1340 self.assertEqual(foo_methods, ['f', 'g'])
1341 self.assertEqual(bar_methods, ['f', '_h'])
1342
1343 self.assertEqual(foo.f(), 'f()')
1344 self.assertRaises(ValueError, foo.g)
1345 self.assertEqual(foo._callmethod('f'), 'f()')
1346 self.assertRaises(RemoteError, foo._callmethod, '_h')
1347
1348 self.assertEqual(bar.f(), 'f()')
1349 self.assertEqual(bar._h(), '_h()')
1350 self.assertEqual(bar._callmethod('f'), 'f()')
1351 self.assertEqual(bar._callmethod('_h'), '_h()')
1352
1353 self.assertEqual(list(baz), [i*i for i in range(10)])
1354
1355 manager.shutdown()
1356
1357#
1358# Test of connecting to a remote server and using xmlrpclib for serialization
1359#
1360
1361_queue = pyqueue.Queue()
1362def get_queue():
1363 return _queue
1364
1365class QueueManager(BaseManager):
1366 '''manager class used by server process'''
1367QueueManager.register('get_queue', callable=get_queue)
1368
1369class QueueManager2(BaseManager):
1370 '''manager class which specifies the same interface as QueueManager'''
1371QueueManager2.register('get_queue')
1372
1373
1374SERIALIZER = 'xmlrpclib'
1375
1376class _TestRemoteManager(BaseTestCase):
1377
1378 ALLOWED_TYPES = ('manager',)
1379
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001380 @classmethod
1381 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001382 manager = QueueManager2(
1383 address=address, authkey=authkey, serializer=SERIALIZER
1384 )
1385 manager.connect()
1386 queue = manager.get_queue()
1387 queue.put(('hello world', None, True, 2.25))
1388
1389 def test_remote(self):
1390 authkey = os.urandom(32)
1391
1392 manager = QueueManager(
1393 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1394 )
1395 manager.start()
1396
1397 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001398 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001399 p.start()
1400
1401 manager2 = QueueManager2(
1402 address=manager.address, authkey=authkey, serializer=SERIALIZER
1403 )
1404 manager2.connect()
1405 queue = manager2.get_queue()
1406
1407 # Note that xmlrpclib will deserialize object as a list not a tuple
1408 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1409
1410 # Because we are using xmlrpclib for serialization instead of
1411 # pickle this will cause a serialization error.
1412 self.assertRaises(Exception, queue.put, time.sleep)
1413
1414 # Make queue finalizer run before the server is stopped
1415 del queue
1416 manager.shutdown()
1417
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001418class _TestManagerRestart(BaseTestCase):
1419
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001420 @classmethod
1421 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001422 manager = QueueManager(
1423 address=address, authkey=authkey, serializer=SERIALIZER)
1424 manager.connect()
1425 queue = manager.get_queue()
1426 queue.put('hello world')
1427
1428 def test_rapid_restart(self):
1429 authkey = os.urandom(32)
1430 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001431 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001432 srvr = manager.get_server()
1433 addr = srvr.address
1434 # Close the connection.Listener socket which gets opened as a part
1435 # of manager.get_server(). It's not needed for the test.
1436 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001437 manager.start()
1438
1439 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001440 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001441 p.start()
1442 queue = manager.get_queue()
1443 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001444 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001445 manager.shutdown()
1446 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001447 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001448 try:
1449 manager.start()
1450 except IOError as e:
1451 if e.errno != errno.EADDRINUSE:
1452 raise
1453 # Retry after some time, in case the old socket was lingering
1454 # (sporadic failure on buildbots)
1455 time.sleep(1.0)
1456 manager = QueueManager(
1457 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001458 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001459
Benjamin Petersone711caf2008-06-11 16:44:04 +00001460#
1461#
1462#
1463
1464SENTINEL = latin('')
1465
1466class _TestConnection(BaseTestCase):
1467
1468 ALLOWED_TYPES = ('processes', 'threads')
1469
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001470 @classmethod
1471 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001472 for msg in iter(conn.recv_bytes, SENTINEL):
1473 conn.send_bytes(msg)
1474 conn.close()
1475
1476 def test_connection(self):
1477 conn, child_conn = self.Pipe()
1478
1479 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001480 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001481 p.start()
1482
1483 seq = [1, 2.25, None]
1484 msg = latin('hello world')
1485 longmsg = msg * 10
1486 arr = array.array('i', list(range(4)))
1487
1488 if self.TYPE == 'processes':
1489 self.assertEqual(type(conn.fileno()), int)
1490
1491 self.assertEqual(conn.send(seq), None)
1492 self.assertEqual(conn.recv(), seq)
1493
1494 self.assertEqual(conn.send_bytes(msg), None)
1495 self.assertEqual(conn.recv_bytes(), msg)
1496
1497 if self.TYPE == 'processes':
1498 buffer = array.array('i', [0]*10)
1499 expected = list(arr) + [0] * (10 - len(arr))
1500 self.assertEqual(conn.send_bytes(arr), None)
1501 self.assertEqual(conn.recv_bytes_into(buffer),
1502 len(arr) * buffer.itemsize)
1503 self.assertEqual(list(buffer), expected)
1504
1505 buffer = array.array('i', [0]*10)
1506 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1507 self.assertEqual(conn.send_bytes(arr), None)
1508 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1509 len(arr) * buffer.itemsize)
1510 self.assertEqual(list(buffer), expected)
1511
1512 buffer = bytearray(latin(' ' * 40))
1513 self.assertEqual(conn.send_bytes(longmsg), None)
1514 try:
1515 res = conn.recv_bytes_into(buffer)
1516 except multiprocessing.BufferTooShort as e:
1517 self.assertEqual(e.args, (longmsg,))
1518 else:
1519 self.fail('expected BufferTooShort, got %s' % res)
1520
1521 poll = TimingWrapper(conn.poll)
1522
1523 self.assertEqual(poll(), False)
1524 self.assertTimingAlmostEqual(poll.elapsed, 0)
1525
1526 self.assertEqual(poll(TIMEOUT1), False)
1527 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1528
1529 conn.send(None)
1530
1531 self.assertEqual(poll(TIMEOUT1), True)
1532 self.assertTimingAlmostEqual(poll.elapsed, 0)
1533
1534 self.assertEqual(conn.recv(), None)
1535
1536 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1537 conn.send_bytes(really_big_msg)
1538 self.assertEqual(conn.recv_bytes(), really_big_msg)
1539
1540 conn.send_bytes(SENTINEL) # tell child to quit
1541 child_conn.close()
1542
1543 if self.TYPE == 'processes':
1544 self.assertEqual(conn.readable, True)
1545 self.assertEqual(conn.writable, True)
1546 self.assertRaises(EOFError, conn.recv)
1547 self.assertRaises(EOFError, conn.recv_bytes)
1548
1549 p.join()
1550
1551 def test_duplex_false(self):
1552 reader, writer = self.Pipe(duplex=False)
1553 self.assertEqual(writer.send(1), None)
1554 self.assertEqual(reader.recv(), 1)
1555 if self.TYPE == 'processes':
1556 self.assertEqual(reader.readable, True)
1557 self.assertEqual(reader.writable, False)
1558 self.assertEqual(writer.readable, False)
1559 self.assertEqual(writer.writable, True)
1560 self.assertRaises(IOError, reader.send, 2)
1561 self.assertRaises(IOError, writer.recv)
1562 self.assertRaises(IOError, writer.poll)
1563
1564 def test_spawn_close(self):
1565 # We test that a pipe connection can be closed by parent
1566 # process immediately after child is spawned. On Windows this
1567 # would have sometimes failed on old versions because
1568 # child_conn would be closed before the child got a chance to
1569 # duplicate it.
1570 conn, child_conn = self.Pipe()
1571
1572 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001573 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001574 p.start()
1575 child_conn.close() # this might complete before child initializes
1576
1577 msg = latin('hello')
1578 conn.send_bytes(msg)
1579 self.assertEqual(conn.recv_bytes(), msg)
1580
1581 conn.send_bytes(SENTINEL)
1582 conn.close()
1583 p.join()
1584
1585 def test_sendbytes(self):
1586 if self.TYPE != 'processes':
1587 return
1588
1589 msg = latin('abcdefghijklmnopqrstuvwxyz')
1590 a, b = self.Pipe()
1591
1592 a.send_bytes(msg)
1593 self.assertEqual(b.recv_bytes(), msg)
1594
1595 a.send_bytes(msg, 5)
1596 self.assertEqual(b.recv_bytes(), msg[5:])
1597
1598 a.send_bytes(msg, 7, 8)
1599 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1600
1601 a.send_bytes(msg, 26)
1602 self.assertEqual(b.recv_bytes(), latin(''))
1603
1604 a.send_bytes(msg, 26, 0)
1605 self.assertEqual(b.recv_bytes(), latin(''))
1606
1607 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1608
1609 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1610
1611 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1612
1613 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1614
1615 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1616
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001617 @classmethod
1618 def _is_fd_assigned(cls, fd):
1619 try:
1620 os.fstat(fd)
1621 except OSError as e:
1622 if e.errno == errno.EBADF:
1623 return False
1624 raise
1625 else:
1626 return True
1627
1628 @classmethod
1629 def _writefd(cls, conn, data, create_dummy_fds=False):
1630 if create_dummy_fds:
1631 for i in range(0, 256):
1632 if not cls._is_fd_assigned(i):
1633 os.dup2(conn.fileno(), i)
1634 fd = reduction.recv_handle(conn)
1635 if msvcrt:
1636 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1637 os.write(fd, data)
1638 os.close(fd)
1639
Charles-François Natalibc8f0822011-09-20 20:36:51 +02001640 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001641 def test_fd_transfer(self):
1642 if self.TYPE != 'processes':
1643 self.skipTest("only makes sense with processes")
1644 conn, child_conn = self.Pipe(duplex=True)
1645
1646 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02001647 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001648 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001649 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001650 with open(test.support.TESTFN, "wb") as f:
1651 fd = f.fileno()
1652 if msvcrt:
1653 fd = msvcrt.get_osfhandle(fd)
1654 reduction.send_handle(conn, fd, p.pid)
1655 p.join()
1656 with open(test.support.TESTFN, "rb") as f:
1657 self.assertEqual(f.read(), b"foo")
1658
Charles-François Natalibc8f0822011-09-20 20:36:51 +02001659 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001660 @unittest.skipIf(sys.platform == "win32",
1661 "test semantics don't make sense on Windows")
1662 @unittest.skipIf(MAXFD <= 256,
1663 "largest assignable fd number is too small")
1664 @unittest.skipUnless(hasattr(os, "dup2"),
1665 "test needs os.dup2()")
1666 def test_large_fd_transfer(self):
1667 # With fd > 256 (issue #11657)
1668 if self.TYPE != 'processes':
1669 self.skipTest("only makes sense with processes")
1670 conn, child_conn = self.Pipe(duplex=True)
1671
1672 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02001673 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001674 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001675 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001676 with open(test.support.TESTFN, "wb") as f:
1677 fd = f.fileno()
1678 for newfd in range(256, MAXFD):
1679 if not self._is_fd_assigned(newfd):
1680 break
1681 else:
1682 self.fail("could not find an unassigned large file descriptor")
1683 os.dup2(fd, newfd)
1684 try:
1685 reduction.send_handle(conn, newfd, p.pid)
1686 finally:
1687 os.close(newfd)
1688 p.join()
1689 with open(test.support.TESTFN, "rb") as f:
1690 self.assertEqual(f.read(), b"bar")
1691
Jesus Cea4507e642011-09-21 03:53:25 +02001692 @classmethod
1693 def _send_data_without_fd(self, conn):
1694 os.write(conn.fileno(), b"\0")
1695
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001696 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02001697 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1698 def test_missing_fd_transfer(self):
1699 # Check that exception is raised when received data is not
1700 # accompanied by a file descriptor in ancillary data.
1701 if self.TYPE != 'processes':
1702 self.skipTest("only makes sense with processes")
1703 conn, child_conn = self.Pipe(duplex=True)
1704
1705 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1706 p.daemon = True
1707 p.start()
1708 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1709 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001710
Benjamin Petersone711caf2008-06-11 16:44:04 +00001711class _TestListenerClient(BaseTestCase):
1712
1713 ALLOWED_TYPES = ('processes', 'threads')
1714
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001715 @classmethod
1716 def _test(cls, address):
1717 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001718 conn.send('hello')
1719 conn.close()
1720
1721 def test_listener_client(self):
1722 for family in self.connection.families:
1723 l = self.connection.Listener(family=family)
1724 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001725 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001726 p.start()
1727 conn = l.accept()
1728 self.assertEqual(conn.recv(), 'hello')
1729 p.join()
1730 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001731#
1732# Test of sending connection and socket objects between processes
1733#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001734"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001735class _TestPicklingConnections(BaseTestCase):
1736
1737 ALLOWED_TYPES = ('processes',)
1738
1739 def _listener(self, conn, families):
1740 for fam in families:
1741 l = self.connection.Listener(family=fam)
1742 conn.send(l.address)
1743 new_conn = l.accept()
1744 conn.send(new_conn)
1745
1746 if self.TYPE == 'processes':
1747 l = socket.socket()
1748 l.bind(('localhost', 0))
1749 conn.send(l.getsockname())
1750 l.listen(1)
1751 new_conn, addr = l.accept()
1752 conn.send(new_conn)
1753
1754 conn.recv()
1755
1756 def _remote(self, conn):
1757 for (address, msg) in iter(conn.recv, None):
1758 client = self.connection.Client(address)
1759 client.send(msg.upper())
1760 client.close()
1761
1762 if self.TYPE == 'processes':
1763 address, msg = conn.recv()
1764 client = socket.socket()
1765 client.connect(address)
1766 client.sendall(msg.upper())
1767 client.close()
1768
1769 conn.close()
1770
1771 def test_pickling(self):
1772 try:
1773 multiprocessing.allow_connection_pickling()
1774 except ImportError:
1775 return
1776
1777 families = self.connection.families
1778
1779 lconn, lconn0 = self.Pipe()
1780 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02001781 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001782 lp.start()
1783 lconn0.close()
1784
1785 rconn, rconn0 = self.Pipe()
1786 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001787 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001788 rp.start()
1789 rconn0.close()
1790
1791 for fam in families:
1792 msg = ('This connection uses family %s' % fam).encode('ascii')
1793 address = lconn.recv()
1794 rconn.send((address, msg))
1795 new_conn = lconn.recv()
1796 self.assertEqual(new_conn.recv(), msg.upper())
1797
1798 rconn.send(None)
1799
1800 if self.TYPE == 'processes':
1801 msg = latin('This connection uses a normal socket')
1802 address = lconn.recv()
1803 rconn.send((address, msg))
1804 if hasattr(socket, 'fromfd'):
1805 new_conn = lconn.recv()
1806 self.assertEqual(new_conn.recv(100), msg.upper())
1807 else:
1808 # XXX On Windows with Py2.6 need to backport fromfd()
1809 discard = lconn.recv_bytes()
1810
1811 lconn.send(None)
1812
1813 rconn.close()
1814 lconn.close()
1815
1816 lp.join()
1817 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001818"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001819#
1820#
1821#
1822
1823class _TestHeap(BaseTestCase):
1824
1825 ALLOWED_TYPES = ('processes',)
1826
1827 def test_heap(self):
1828 iterations = 5000
1829 maxblocks = 50
1830 blocks = []
1831
1832 # create and destroy lots of blocks of different sizes
1833 for i in range(iterations):
1834 size = int(random.lognormvariate(0, 1) * 1000)
1835 b = multiprocessing.heap.BufferWrapper(size)
1836 blocks.append(b)
1837 if len(blocks) > maxblocks:
1838 i = random.randrange(maxblocks)
1839 del blocks[i]
1840
1841 # get the heap object
1842 heap = multiprocessing.heap.BufferWrapper._heap
1843
1844 # verify the state of the heap
1845 all = []
1846 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02001847 heap._lock.acquire()
1848 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001849 for L in list(heap._len_to_seq.values()):
1850 for arena, start, stop in L:
1851 all.append((heap._arenas.index(arena), start, stop,
1852 stop-start, 'free'))
1853 for arena, start, stop in heap._allocated_blocks:
1854 all.append((heap._arenas.index(arena), start, stop,
1855 stop-start, 'occupied'))
1856 occupied += (stop-start)
1857
1858 all.sort()
1859
1860 for i in range(len(all)-1):
1861 (arena, start, stop) = all[i][:3]
1862 (narena, nstart, nstop) = all[i+1][:3]
1863 self.assertTrue((arena != narena and nstart == 0) or
1864 (stop == nstart))
1865
Charles-François Natali778db492011-07-02 14:35:49 +02001866 def test_free_from_gc(self):
1867 # Check that freeing of blocks by the garbage collector doesn't deadlock
1868 # (issue #12352).
1869 # Make sure the GC is enabled, and set lower collection thresholds to
1870 # make collections more frequent (and increase the probability of
1871 # deadlock).
1872 if not gc.isenabled():
1873 gc.enable()
1874 self.addCleanup(gc.disable)
1875 thresholds = gc.get_threshold()
1876 self.addCleanup(gc.set_threshold, *thresholds)
1877 gc.set_threshold(10)
1878
1879 # perform numerous block allocations, with cyclic references to make
1880 # sure objects are collected asynchronously by the gc
1881 for i in range(5000):
1882 a = multiprocessing.heap.BufferWrapper(1)
1883 b = multiprocessing.heap.BufferWrapper(1)
1884 # circular references
1885 a.buddy = b
1886 b.buddy = a
1887
Benjamin Petersone711caf2008-06-11 16:44:04 +00001888#
1889#
1890#
1891
Benjamin Petersone711caf2008-06-11 16:44:04 +00001892class _Foo(Structure):
1893 _fields_ = [
1894 ('x', c_int),
1895 ('y', c_double)
1896 ]
1897
1898class _TestSharedCTypes(BaseTestCase):
1899
1900 ALLOWED_TYPES = ('processes',)
1901
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001902 def setUp(self):
1903 if not HAS_SHAREDCTYPES:
1904 self.skipTest("requires multiprocessing.sharedctypes")
1905
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001906 @classmethod
1907 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001908 x.value *= 2
1909 y.value *= 2
1910 foo.x *= 2
1911 foo.y *= 2
1912 string.value *= 2
1913 for i in range(len(arr)):
1914 arr[i] *= 2
1915
1916 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001917 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001918 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001919 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001920 arr = self.Array('d', list(range(10)), lock=lock)
1921 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001922 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001923
1924 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02001925 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001926 p.start()
1927 p.join()
1928
1929 self.assertEqual(x.value, 14)
1930 self.assertAlmostEqual(y.value, 2.0/3.0)
1931 self.assertEqual(foo.x, 6)
1932 self.assertAlmostEqual(foo.y, 4.0)
1933 for i in range(10):
1934 self.assertAlmostEqual(arr[i], i*2)
1935 self.assertEqual(string.value, latin('hellohello'))
1936
1937 def test_synchronize(self):
1938 self.test_sharedctypes(lock=True)
1939
1940 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001941 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001942 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001943 foo.x = 0
1944 foo.y = 0
1945 self.assertEqual(bar.x, 2)
1946 self.assertAlmostEqual(bar.y, 5.0)
1947
1948#
1949#
1950#
1951
1952class _TestFinalize(BaseTestCase):
1953
1954 ALLOWED_TYPES = ('processes',)
1955
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001956 @classmethod
1957 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001958 class Foo(object):
1959 pass
1960
1961 a = Foo()
1962 util.Finalize(a, conn.send, args=('a',))
1963 del a # triggers callback for a
1964
1965 b = Foo()
1966 close_b = util.Finalize(b, conn.send, args=('b',))
1967 close_b() # triggers callback for b
1968 close_b() # does nothing because callback has already been called
1969 del b # does nothing because callback has already been called
1970
1971 c = Foo()
1972 util.Finalize(c, conn.send, args=('c',))
1973
1974 d10 = Foo()
1975 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1976
1977 d01 = Foo()
1978 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1979 d02 = Foo()
1980 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1981 d03 = Foo()
1982 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1983
1984 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1985
1986 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1987
Ezio Melotti13925002011-03-16 11:05:33 +02001988 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00001989 # garbage collecting locals
1990 util._exit_function()
1991 conn.close()
1992 os._exit(0)
1993
1994 def test_finalize(self):
1995 conn, child_conn = self.Pipe()
1996
1997 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001998 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001999 p.start()
2000 p.join()
2001
2002 result = [obj for obj in iter(conn.recv, 'STOP')]
2003 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2004
2005#
2006# Test that from ... import * works for each module
2007#
2008
2009class _TestImportStar(BaseTestCase):
2010
2011 ALLOWED_TYPES = ('processes',)
2012
2013 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002014 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002015 'multiprocessing', 'multiprocessing.connection',
2016 'multiprocessing.heap', 'multiprocessing.managers',
2017 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002018 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002019 ]
2020
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002021 if HAS_REDUCTION:
2022 modules.append('multiprocessing.reduction')
2023
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002024 if c_int is not None:
2025 # This module requires _ctypes
2026 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002027
2028 for name in modules:
2029 __import__(name)
2030 mod = sys.modules[name]
2031
2032 for attr in getattr(mod, '__all__', ()):
2033 self.assertTrue(
2034 hasattr(mod, attr),
2035 '%r does not have attribute %r' % (mod, attr)
2036 )
2037
2038#
2039# Quick test that logging works -- does not test logging output
2040#
2041
2042class _TestLogging(BaseTestCase):
2043
2044 ALLOWED_TYPES = ('processes',)
2045
2046 def test_enable_logging(self):
2047 logger = multiprocessing.get_logger()
2048 logger.setLevel(util.SUBWARNING)
2049 self.assertTrue(logger is not None)
2050 logger.debug('this will not be printed')
2051 logger.info('nor will this')
2052 logger.setLevel(LOG_LEVEL)
2053
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002054 @classmethod
2055 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002056 logger = multiprocessing.get_logger()
2057 conn.send(logger.getEffectiveLevel())
2058
2059 def test_level(self):
2060 LEVEL1 = 32
2061 LEVEL2 = 37
2062
2063 logger = multiprocessing.get_logger()
2064 root_logger = logging.getLogger()
2065 root_level = root_logger.level
2066
2067 reader, writer = multiprocessing.Pipe(duplex=False)
2068
2069 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002070 p = self.Process(target=self._test_level, args=(writer,))
2071 p.daemon = True
2072 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002073 self.assertEqual(LEVEL1, reader.recv())
2074
2075 logger.setLevel(logging.NOTSET)
2076 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002077 p = self.Process(target=self._test_level, args=(writer,))
2078 p.daemon = True
2079 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002080 self.assertEqual(LEVEL2, reader.recv())
2081
2082 root_logger.setLevel(root_level)
2083 logger.setLevel(level=LOG_LEVEL)
2084
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002085
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002086# class _TestLoggingProcessName(BaseTestCase):
2087#
2088# def handle(self, record):
2089# assert record.processName == multiprocessing.current_process().name
2090# self.__handled = True
2091#
2092# def test_logging(self):
2093# handler = logging.Handler()
2094# handler.handle = self.handle
2095# self.__handled = False
2096# # Bypass getLogger() and side-effects
2097# logger = logging.getLoggerClass()(
2098# 'multiprocessing.test.TestLoggingProcessName')
2099# logger.addHandler(handler)
2100# logger.propagate = False
2101#
2102# logger.warn('foo')
2103# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002104
Benjamin Petersone711caf2008-06-11 16:44:04 +00002105#
Jesse Noller6214edd2009-01-19 16:23:53 +00002106# Test to verify handle verification, see issue 3321
2107#
2108
2109class TestInvalidHandle(unittest.TestCase):
2110
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002111 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002112 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002113 conn = multiprocessing.connection.Connection(44977608)
2114 try:
2115 self.assertRaises((ValueError, IOError), conn.poll)
2116 finally:
2117 # Hack private attribute _handle to avoid printing an error
2118 # in conn.__del__
2119 conn._handle = None
2120 self.assertRaises((ValueError, IOError),
2121 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002122
Jesse Noller6214edd2009-01-19 16:23:53 +00002123#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002124# Functions used to create test cases from the base ones in this module
2125#
2126
2127def get_attributes(Source, names):
2128 d = {}
2129 for name in names:
2130 obj = getattr(Source, name)
2131 if type(obj) == type(get_attributes):
2132 obj = staticmethod(obj)
2133 d[name] = obj
2134 return d
2135
2136def create_test_cases(Mixin, type):
2137 result = {}
2138 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002139 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002140
2141 for name in list(glob.keys()):
2142 if name.startswith('_Test'):
2143 base = glob[name]
2144 if type in base.ALLOWED_TYPES:
2145 newname = 'With' + Type + name[1:]
2146 class Temp(base, unittest.TestCase, Mixin):
2147 pass
2148 result[newname] = Temp
2149 Temp.__name__ = newname
2150 Temp.__module__ = Mixin.__module__
2151 return result
2152
2153#
2154# Create test cases
2155#
2156
2157class ProcessesMixin(object):
2158 TYPE = 'processes'
2159 Process = multiprocessing.Process
2160 locals().update(get_attributes(multiprocessing, (
2161 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2162 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2163 'RawArray', 'current_process', 'active_children', 'Pipe',
2164 'connection', 'JoinableQueue'
2165 )))
2166
2167testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2168globals().update(testcases_processes)
2169
2170
2171class ManagerMixin(object):
2172 TYPE = 'manager'
2173 Process = multiprocessing.Process
2174 manager = object.__new__(multiprocessing.managers.SyncManager)
2175 locals().update(get_attributes(manager, (
2176 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2177 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2178 'Namespace', 'JoinableQueue'
2179 )))
2180
2181testcases_manager = create_test_cases(ManagerMixin, type='manager')
2182globals().update(testcases_manager)
2183
2184
2185class ThreadsMixin(object):
2186 TYPE = 'threads'
2187 Process = multiprocessing.dummy.Process
2188 locals().update(get_attributes(multiprocessing.dummy, (
2189 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2190 'Condition', 'Event', 'Value', 'Array', 'current_process',
2191 'active_children', 'Pipe', 'connection', 'dict', 'list',
2192 'Namespace', 'JoinableQueue'
2193 )))
2194
2195testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2196globals().update(testcases_threads)
2197
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002198class OtherTest(unittest.TestCase):
2199 # TODO: add more tests for deliver/answer challenge.
2200 def test_deliver_challenge_auth_failure(self):
2201 class _FakeConnection(object):
2202 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002203 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002204 def send_bytes(self, data):
2205 pass
2206 self.assertRaises(multiprocessing.AuthenticationError,
2207 multiprocessing.connection.deliver_challenge,
2208 _FakeConnection(), b'abc')
2209
2210 def test_answer_challenge_auth_failure(self):
2211 class _FakeConnection(object):
2212 def __init__(self):
2213 self.count = 0
2214 def recv_bytes(self, size):
2215 self.count += 1
2216 if self.count == 1:
2217 return multiprocessing.connection.CHALLENGE
2218 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002219 return b'something bogus'
2220 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002221 def send_bytes(self, data):
2222 pass
2223 self.assertRaises(multiprocessing.AuthenticationError,
2224 multiprocessing.connection.answer_challenge,
2225 _FakeConnection(), b'abc')
2226
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002227#
2228# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2229#
2230
2231def initializer(ns):
2232 ns.test += 1
2233
2234class TestInitializers(unittest.TestCase):
2235 def setUp(self):
2236 self.mgr = multiprocessing.Manager()
2237 self.ns = self.mgr.Namespace()
2238 self.ns.test = 0
2239
2240 def tearDown(self):
2241 self.mgr.shutdown()
2242
2243 def test_manager_initializer(self):
2244 m = multiprocessing.managers.SyncManager()
2245 self.assertRaises(TypeError, m.start, 1)
2246 m.start(initializer, (self.ns,))
2247 self.assertEqual(self.ns.test, 1)
2248 m.shutdown()
2249
2250 def test_pool_initializer(self):
2251 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2252 p = multiprocessing.Pool(1, initializer, (self.ns,))
2253 p.close()
2254 p.join()
2255 self.assertEqual(self.ns.test, 1)
2256
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002257#
2258# Issue 5155, 5313, 5331: Test process in processes
2259# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2260#
2261
2262def _ThisSubProcess(q):
2263 try:
2264 item = q.get(block=False)
2265 except pyqueue.Empty:
2266 pass
2267
2268def _TestProcess(q):
2269 queue = multiprocessing.Queue()
2270 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002271 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002272 subProc.start()
2273 subProc.join()
2274
2275def _afunc(x):
2276 return x*x
2277
2278def pool_in_process():
2279 pool = multiprocessing.Pool(processes=4)
2280 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2281
2282class _file_like(object):
2283 def __init__(self, delegate):
2284 self._delegate = delegate
2285 self._pid = None
2286
2287 @property
2288 def cache(self):
2289 pid = os.getpid()
2290 # There are no race conditions since fork keeps only the running thread
2291 if pid != self._pid:
2292 self._pid = pid
2293 self._cache = []
2294 return self._cache
2295
2296 def write(self, data):
2297 self.cache.append(data)
2298
2299 def flush(self):
2300 self._delegate.write(''.join(self.cache))
2301 self._cache = []
2302
2303class TestStdinBadfiledescriptor(unittest.TestCase):
2304
2305 def test_queue_in_process(self):
2306 queue = multiprocessing.Queue()
2307 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2308 proc.start()
2309 proc.join()
2310
2311 def test_pool_in_process(self):
2312 p = multiprocessing.Process(target=pool_in_process)
2313 p.start()
2314 p.join()
2315
2316 def test_flushing(self):
2317 sio = io.StringIO()
2318 flike = _file_like(sio)
2319 flike.write('foo')
2320 proc = multiprocessing.Process(target=lambda: flike.flush())
2321 flike.flush()
2322 assert sio.getvalue() == 'foo'
2323
2324testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2325 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002326
Benjamin Petersone711caf2008-06-11 16:44:04 +00002327#
2328#
2329#
2330
2331def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002332 if sys.platform.startswith("linux"):
2333 try:
2334 lock = multiprocessing.RLock()
2335 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002336 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002337
Benjamin Petersone711caf2008-06-11 16:44:04 +00002338 if run is None:
2339 from test.support import run_unittest as run
2340
2341 util.get_temp_dir() # creates temp directory for use by all processes
2342
2343 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2344
Benjamin Peterson41181742008-07-02 20:22:54 +00002345 ProcessesMixin.pool = multiprocessing.Pool(4)
2346 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2347 ManagerMixin.manager.__init__()
2348 ManagerMixin.manager.start()
2349 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002350
2351 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002352 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2353 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002354 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2355 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002356 )
2357
2358 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2359 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2360 run(suite)
2361
Benjamin Peterson41181742008-07-02 20:22:54 +00002362 ThreadsMixin.pool.terminate()
2363 ProcessesMixin.pool.terminate()
2364 ManagerMixin.pool.terminate()
2365 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002366
Benjamin Peterson41181742008-07-02 20:22:54 +00002367 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002368
2369def main():
2370 test_main(unittest.TextTestRunner(verbosity=2).run)
2371
2372if __name__ == '__main__':
2373 main()