blob: bb4c7d874f3db905845a6b609cc858c96ba1b289 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module implementing queues
3#
4# multiprocessing/queues.py
5#
R. David Murray3fc969a2010-12-14 01:38:16 +00006# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14# notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16# notice, this list of conditions and the following disclaimer in the
17# documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19# used to endorse or promote products derived from this software
20# without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
Benjamin Petersone711caf2008-06-11 16:44:04 +000033#
34
Jesse Noller14f3ae22009-03-31 03:37:07 +000035__all__ = ['Queue', 'SimpleQueue', 'JoinableQueue']
Benjamin Petersone711caf2008-06-11 16:44:04 +000036
37import sys
38import os
39import threading
40import collections
41import time
42import atexit
43import weakref
Antoine Pitroudc19c242011-07-16 01:51:58 +020044import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000045
46from queue import Empty, Full
47import _multiprocessing
Antoine Pitroudd696492011-06-08 17:21:55 +020048from multiprocessing.connection import Pipe, SentinelReady
Benjamin Petersone711caf2008-06-11 16:44:04 +000049from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
50from multiprocessing.util import debug, info, Finalize, register_after_fork
51from multiprocessing.forking import assert_spawning
52
53#
54# Queue type using a pipe, buffer and thread
55#
56
57class Queue(object):
58
59 def __init__(self, maxsize=0):
60 if maxsize <= 0:
61 maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
62 self._maxsize = maxsize
63 self._reader, self._writer = Pipe(duplex=False)
64 self._rlock = Lock()
65 self._opid = os.getpid()
66 if sys.platform == 'win32':
67 self._wlock = None
68 else:
69 self._wlock = Lock()
70 self._sem = BoundedSemaphore(maxsize)
Antoine Pitroudc19c242011-07-16 01:51:58 +020071 # For use by concurrent.futures
72 self._ignore_epipe = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000073
74 self._after_fork()
75
76 if sys.platform != 'win32':
77 register_after_fork(self, Queue._after_fork)
78
79 def __getstate__(self):
80 assert_spawning(self)
Antoine Pitroufb960892011-07-20 02:01:39 +020081 return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
Benjamin Petersone711caf2008-06-11 16:44:04 +000082 self._rlock, self._wlock, self._sem, self._opid)
83
84 def __setstate__(self, state):
Antoine Pitroufb960892011-07-20 02:01:39 +020085 (self._ignore_epipe, self._maxsize, self._reader, self._writer,
Benjamin Petersone711caf2008-06-11 16:44:04 +000086 self._rlock, self._wlock, self._sem, self._opid) = state
87 self._after_fork()
88
89 def _after_fork(self):
90 debug('Queue._after_fork()')
91 self._notempty = threading.Condition(threading.Lock())
92 self._buffer = collections.deque()
93 self._thread = None
94 self._jointhread = None
95 self._joincancelled = False
96 self._closed = False
97 self._close = None
98 self._send = self._writer.send
99 self._recv = self._reader.recv
100 self._poll = self._reader.poll
101
102 def put(self, obj, block=True, timeout=None):
103 assert not self._closed
104 if not self._sem.acquire(block, timeout):
105 raise Full
106
107 self._notempty.acquire()
108 try:
109 if self._thread is None:
110 self._start_thread()
111 self._buffer.append(obj)
112 self._notempty.notify()
113 finally:
114 self._notempty.release()
115
116 def get(self, block=True, timeout=None):
117 if block and timeout is None:
118 self._rlock.acquire()
119 try:
120 res = self._recv()
121 self._sem.release()
122 return res
123 finally:
124 self._rlock.release()
125
126 else:
127 if block:
128 deadline = time.time() + timeout
129 if not self._rlock.acquire(block, timeout):
130 raise Empty
131 try:
132 if not self._poll(block and (deadline-time.time()) or 0.0):
133 raise Empty
134 res = self._recv()
135 self._sem.release()
136 return res
137 finally:
138 self._rlock.release()
139
140 def qsize(self):
Benjamin Peterson87c8d872009-06-11 22:54:11 +0000141 # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000142 return self._maxsize - self._sem._semlock._get_value()
143
144 def empty(self):
145 return not self._poll()
146
147 def full(self):
148 return self._sem._semlock._is_zero()
149
150 def get_nowait(self):
151 return self.get(False)
152
153 def put_nowait(self, obj):
154 return self.put(obj, False)
155
156 def close(self):
157 self._closed = True
158 self._reader.close()
159 if self._close:
160 self._close()
161
162 def join_thread(self):
163 debug('Queue.join_thread()')
164 assert self._closed
165 if self._jointhread:
166 self._jointhread()
167
168 def cancel_join_thread(self):
169 debug('Queue.cancel_join_thread()')
170 self._joincancelled = True
171 try:
172 self._jointhread.cancel()
173 except AttributeError:
174 pass
175
176 def _start_thread(self):
177 debug('Queue._start_thread()')
178
179 # Start thread which transfers data from buffer to pipe
180 self._buffer.clear()
181 self._thread = threading.Thread(
182 target=Queue._feed,
183 args=(self._buffer, self._notempty, self._send,
Antoine Pitroudc19c242011-07-16 01:51:58 +0200184 self._wlock, self._writer.close, self._ignore_epipe),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000185 name='QueueFeederThread'
186 )
Benjamin Peterson72753702008-08-18 18:09:21 +0000187 self._thread.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000188
189 debug('doing self._thread.start()')
190 self._thread.start()
191 debug('... done self._thread.start()')
192
193 # On process exit we will wait for data to be flushed to pipe.
194 #
195 # However, if this process created the queue then all
196 # processes which use the queue will be descendants of this
197 # process. Therefore waiting for the queue to be flushed
198 # is pointless once all the child processes have been joined.
199 created_by_this_process = (self._opid == os.getpid())
200 if not self._joincancelled and not created_by_this_process:
201 self._jointhread = Finalize(
202 self._thread, Queue._finalize_join,
203 [weakref.ref(self._thread)],
204 exitpriority=-5
205 )
206
207 # Send sentinel to the thread queue object when garbage collected
208 self._close = Finalize(
209 self, Queue._finalize_close,
210 [self._buffer, self._notempty],
211 exitpriority=10
212 )
213
214 @staticmethod
215 def _finalize_join(twr):
216 debug('joining queue thread')
217 thread = twr()
218 if thread is not None:
219 thread.join()
220 debug('... queue thread joined')
221 else:
222 debug('... queue thread already dead')
223
224 @staticmethod
225 def _finalize_close(buffer, notempty):
226 debug('telling queue thread to quit')
227 notempty.acquire()
228 try:
229 buffer.append(_sentinel)
230 notempty.notify()
231 finally:
232 notempty.release()
233
234 @staticmethod
Antoine Pitroudc19c242011-07-16 01:51:58 +0200235 def _feed(buffer, notempty, send, writelock, close, ignore_epipe):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000236 debug('starting thread to feed data to pipe')
237 from .util import is_exiting
238
239 nacquire = notempty.acquire
240 nrelease = notempty.release
241 nwait = notempty.wait
242 bpopleft = buffer.popleft
243 sentinel = _sentinel
244 if sys.platform != 'win32':
245 wacquire = writelock.acquire
246 wrelease = writelock.release
247 else:
248 wacquire = None
249
250 try:
251 while 1:
252 nacquire()
253 try:
254 if not buffer:
255 nwait()
256 finally:
257 nrelease()
258 try:
259 while 1:
260 obj = bpopleft()
261 if obj is sentinel:
262 debug('feeder thread got sentinel -- exiting')
263 close()
264 return
265
266 if wacquire is None:
267 send(obj)
268 else:
269 wacquire()
270 try:
271 send(obj)
272 finally:
273 wrelease()
274 except IndexError:
275 pass
276 except Exception as e:
Antoine Pitroudc19c242011-07-16 01:51:58 +0200277 if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
278 return
Benjamin Petersone711caf2008-06-11 16:44:04 +0000279 # Since this runs in a daemon thread the resources it uses
280 # may be become unusable while the process is cleaning up.
281 # We ignore errors which happen after the process has
282 # started to cleanup.
283 try:
284 if is_exiting():
285 info('error in queue thread: %s', e)
286 else:
287 import traceback
288 traceback.print_exc()
289 except Exception:
290 pass
291
292_sentinel = object()
293
294#
295# A queue type which also supports join() and task_done() methods
296#
297# Note that if you do not call task_done() for each finished task then
298# eventually the counter's semaphore may overflow causing Bad Things
299# to happen.
300#
301
302class JoinableQueue(Queue):
303
304 def __init__(self, maxsize=0):
305 Queue.__init__(self, maxsize)
306 self._unfinished_tasks = Semaphore(0)
307 self._cond = Condition()
308
309 def __getstate__(self):
310 return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
311
312 def __setstate__(self, state):
313 Queue.__setstate__(self, state[:-2])
314 self._cond, self._unfinished_tasks = state[-2:]
315
Benjamin Peterson8719ad52009-09-11 22:24:02 +0000316 def put(self, obj, block=True, timeout=None):
317 assert not self._closed
318 if not self._sem.acquire(block, timeout):
319 raise Full
320
321 self._notempty.acquire()
322 self._cond.acquire()
323 try:
324 if self._thread is None:
325 self._start_thread()
326 self._buffer.append(obj)
327 self._unfinished_tasks.release()
328 self._notempty.notify()
329 finally:
330 self._cond.release()
331 self._notempty.release()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000332
333 def task_done(self):
334 self._cond.acquire()
335 try:
336 if not self._unfinished_tasks.acquire(False):
337 raise ValueError('task_done() called too many times')
338 if self._unfinished_tasks._semlock._is_zero():
339 self._cond.notify_all()
340 finally:
341 self._cond.release()
342
343 def join(self):
344 self._cond.acquire()
345 try:
346 if not self._unfinished_tasks._semlock._is_zero():
347 self._cond.wait()
348 finally:
349 self._cond.release()
350
351#
352# Simplified Queue type -- really just a locked pipe
353#
354
355class SimpleQueue(object):
356
357 def __init__(self):
358 self._reader, self._writer = Pipe(duplex=False)
359 self._rlock = Lock()
360 if sys.platform == 'win32':
361 self._wlock = None
362 else:
363 self._wlock = Lock()
364 self._make_methods()
365
366 def empty(self):
367 return not self._reader.poll()
368
369 def __getstate__(self):
370 assert_spawning(self)
371 return (self._reader, self._writer, self._rlock, self._wlock)
372
373 def __setstate__(self, state):
374 (self._reader, self._writer, self._rlock, self._wlock) = state
375 self._make_methods()
376
377 def _make_methods(self):
378 recv = self._reader.recv
379 racquire, rrelease = self._rlock.acquire, self._rlock.release
Antoine Pitroudd696492011-06-08 17:21:55 +0200380 def get(*, sentinels=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000381 racquire()
382 try:
Antoine Pitroudd696492011-06-08 17:21:55 +0200383 return recv(sentinels)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000384 finally:
385 rrelease()
386 self.get = get
387
388 if self._wlock is None:
389 # writes to a message oriented win32 pipe are atomic
390 self.put = self._writer.send
391 else:
392 send = self._writer.send
393 wacquire, wrelease = self._wlock.acquire, self._wlock.release
394 def put(obj):
395 wacquire()
396 try:
397 return send(obj)
398 finally:
399 wrelease()
400 self.put = put