blob: cf51307c282eb07777e847c57625e467f1cb0dfd [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module implementing queues
3#
4# multiprocessing/queues.py
5#
R. David Murray3fc969a2010-12-14 01:38:16 +00006# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14# notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16# notice, this list of conditions and the following disclaimer in the
17# documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19# used to endorse or promote products derived from this software
20# without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
Benjamin Petersone711caf2008-06-11 16:44:04 +000033#
34
Jesse Noller14f3ae22009-03-31 03:37:07 +000035__all__ = ['Queue', 'SimpleQueue', 'JoinableQueue']
Benjamin Petersone711caf2008-06-11 16:44:04 +000036
37import sys
38import os
39import threading
40import collections
41import time
42import atexit
43import weakref
Antoine Pitroudc19c242011-07-16 01:51:58 +020044import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000045
46from queue import Empty, Full
47import _multiprocessing
Antoine Pitroudd696492011-06-08 17:21:55 +020048from multiprocessing.connection import Pipe, SentinelReady
Benjamin Petersone711caf2008-06-11 16:44:04 +000049from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
50from multiprocessing.util import debug, info, Finalize, register_after_fork
51from multiprocessing.forking import assert_spawning
52
53#
54# Queue type using a pipe, buffer and thread
55#
56
57class Queue(object):
58
59 def __init__(self, maxsize=0):
60 if maxsize <= 0:
61 maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
62 self._maxsize = maxsize
63 self._reader, self._writer = Pipe(duplex=False)
64 self._rlock = Lock()
65 self._opid = os.getpid()
66 if sys.platform == 'win32':
67 self._wlock = None
68 else:
69 self._wlock = Lock()
70 self._sem = BoundedSemaphore(maxsize)
Antoine Pitroudc19c242011-07-16 01:51:58 +020071 # For use by concurrent.futures
72 self._ignore_epipe = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000073
74 self._after_fork()
75
76 if sys.platform != 'win32':
77 register_after_fork(self, Queue._after_fork)
78
79 def __getstate__(self):
80 assert_spawning(self)
Antoine Pitroufb960892011-07-20 02:01:39 +020081 return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
Benjamin Petersone711caf2008-06-11 16:44:04 +000082 self._rlock, self._wlock, self._sem, self._opid)
83
84 def __setstate__(self, state):
Antoine Pitroufb960892011-07-20 02:01:39 +020085 (self._ignore_epipe, self._maxsize, self._reader, self._writer,
Benjamin Petersone711caf2008-06-11 16:44:04 +000086 self._rlock, self._wlock, self._sem, self._opid) = state
87 self._after_fork()
88
89 def _after_fork(self):
90 debug('Queue._after_fork()')
91 self._notempty = threading.Condition(threading.Lock())
92 self._buffer = collections.deque()
93 self._thread = None
94 self._jointhread = None
95 self._joincancelled = False
96 self._closed = False
97 self._close = None
98 self._send = self._writer.send
99 self._recv = self._reader.recv
100 self._poll = self._reader.poll
101
102 def put(self, obj, block=True, timeout=None):
103 assert not self._closed
104 if not self._sem.acquire(block, timeout):
105 raise Full
106
107 self._notempty.acquire()
108 try:
109 if self._thread is None:
110 self._start_thread()
111 self._buffer.append(obj)
112 self._notempty.notify()
113 finally:
114 self._notempty.release()
115
116 def get(self, block=True, timeout=None):
117 if block and timeout is None:
118 self._rlock.acquire()
119 try:
120 res = self._recv()
121 self._sem.release()
122 return res
123 finally:
124 self._rlock.release()
125
126 else:
127 if block:
128 deadline = time.time() + timeout
129 if not self._rlock.acquire(block, timeout):
130 raise Empty
131 try:
Antoine Pitroua3651132011-11-10 00:37:09 +0100132 if block:
133 timeout = deadline - time.time()
134 if timeout < 0 or not self._poll(timeout):
135 raise Empty
136 elif not self._poll():
Benjamin Petersone711caf2008-06-11 16:44:04 +0000137 raise Empty
138 res = self._recv()
139 self._sem.release()
140 return res
141 finally:
142 self._rlock.release()
143
144 def qsize(self):
Benjamin Peterson87c8d872009-06-11 22:54:11 +0000145 # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000146 return self._maxsize - self._sem._semlock._get_value()
147
148 def empty(self):
149 return not self._poll()
150
151 def full(self):
152 return self._sem._semlock._is_zero()
153
154 def get_nowait(self):
155 return self.get(False)
156
157 def put_nowait(self, obj):
158 return self.put(obj, False)
159
160 def close(self):
161 self._closed = True
162 self._reader.close()
163 if self._close:
164 self._close()
165
166 def join_thread(self):
167 debug('Queue.join_thread()')
168 assert self._closed
169 if self._jointhread:
170 self._jointhread()
171
172 def cancel_join_thread(self):
173 debug('Queue.cancel_join_thread()')
174 self._joincancelled = True
175 try:
176 self._jointhread.cancel()
177 except AttributeError:
178 pass
179
180 def _start_thread(self):
181 debug('Queue._start_thread()')
182
183 # Start thread which transfers data from buffer to pipe
184 self._buffer.clear()
185 self._thread = threading.Thread(
186 target=Queue._feed,
187 args=(self._buffer, self._notempty, self._send,
Antoine Pitroudc19c242011-07-16 01:51:58 +0200188 self._wlock, self._writer.close, self._ignore_epipe),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000189 name='QueueFeederThread'
190 )
Benjamin Peterson72753702008-08-18 18:09:21 +0000191 self._thread.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000192
193 debug('doing self._thread.start()')
194 self._thread.start()
195 debug('... done self._thread.start()')
196
197 # On process exit we will wait for data to be flushed to pipe.
198 #
199 # However, if this process created the queue then all
200 # processes which use the queue will be descendants of this
201 # process. Therefore waiting for the queue to be flushed
202 # is pointless once all the child processes have been joined.
203 created_by_this_process = (self._opid == os.getpid())
204 if not self._joincancelled and not created_by_this_process:
205 self._jointhread = Finalize(
206 self._thread, Queue._finalize_join,
207 [weakref.ref(self._thread)],
208 exitpriority=-5
209 )
210
211 # Send sentinel to the thread queue object when garbage collected
212 self._close = Finalize(
213 self, Queue._finalize_close,
214 [self._buffer, self._notempty],
215 exitpriority=10
216 )
217
218 @staticmethod
219 def _finalize_join(twr):
220 debug('joining queue thread')
221 thread = twr()
222 if thread is not None:
223 thread.join()
224 debug('... queue thread joined')
225 else:
226 debug('... queue thread already dead')
227
228 @staticmethod
229 def _finalize_close(buffer, notempty):
230 debug('telling queue thread to quit')
231 notempty.acquire()
232 try:
233 buffer.append(_sentinel)
234 notempty.notify()
235 finally:
236 notempty.release()
237
238 @staticmethod
Antoine Pitroudc19c242011-07-16 01:51:58 +0200239 def _feed(buffer, notempty, send, writelock, close, ignore_epipe):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000240 debug('starting thread to feed data to pipe')
241 from .util import is_exiting
242
243 nacquire = notempty.acquire
244 nrelease = notempty.release
245 nwait = notempty.wait
246 bpopleft = buffer.popleft
247 sentinel = _sentinel
248 if sys.platform != 'win32':
249 wacquire = writelock.acquire
250 wrelease = writelock.release
251 else:
252 wacquire = None
253
254 try:
255 while 1:
256 nacquire()
257 try:
258 if not buffer:
259 nwait()
260 finally:
261 nrelease()
262 try:
263 while 1:
264 obj = bpopleft()
265 if obj is sentinel:
266 debug('feeder thread got sentinel -- exiting')
267 close()
268 return
269
270 if wacquire is None:
271 send(obj)
272 else:
273 wacquire()
274 try:
275 send(obj)
276 finally:
277 wrelease()
278 except IndexError:
279 pass
280 except Exception as e:
Antoine Pitroudc19c242011-07-16 01:51:58 +0200281 if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
282 return
Benjamin Petersone711caf2008-06-11 16:44:04 +0000283 # Since this runs in a daemon thread the resources it uses
284 # may be become unusable while the process is cleaning up.
285 # We ignore errors which happen after the process has
286 # started to cleanup.
287 try:
288 if is_exiting():
289 info('error in queue thread: %s', e)
290 else:
291 import traceback
292 traceback.print_exc()
293 except Exception:
294 pass
295
296_sentinel = object()
297
298#
299# A queue type which also supports join() and task_done() methods
300#
301# Note that if you do not call task_done() for each finished task then
302# eventually the counter's semaphore may overflow causing Bad Things
303# to happen.
304#
305
306class JoinableQueue(Queue):
307
308 def __init__(self, maxsize=0):
309 Queue.__init__(self, maxsize)
310 self._unfinished_tasks = Semaphore(0)
311 self._cond = Condition()
312
313 def __getstate__(self):
314 return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
315
316 def __setstate__(self, state):
317 Queue.__setstate__(self, state[:-2])
318 self._cond, self._unfinished_tasks = state[-2:]
319
Benjamin Peterson8719ad52009-09-11 22:24:02 +0000320 def put(self, obj, block=True, timeout=None):
321 assert not self._closed
322 if not self._sem.acquire(block, timeout):
323 raise Full
324
325 self._notempty.acquire()
326 self._cond.acquire()
327 try:
328 if self._thread is None:
329 self._start_thread()
330 self._buffer.append(obj)
331 self._unfinished_tasks.release()
332 self._notempty.notify()
333 finally:
334 self._cond.release()
335 self._notempty.release()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000336
337 def task_done(self):
338 self._cond.acquire()
339 try:
340 if not self._unfinished_tasks.acquire(False):
341 raise ValueError('task_done() called too many times')
342 if self._unfinished_tasks._semlock._is_zero():
343 self._cond.notify_all()
344 finally:
345 self._cond.release()
346
347 def join(self):
348 self._cond.acquire()
349 try:
350 if not self._unfinished_tasks._semlock._is_zero():
351 self._cond.wait()
352 finally:
353 self._cond.release()
354
355#
356# Simplified Queue type -- really just a locked pipe
357#
358
359class SimpleQueue(object):
360
361 def __init__(self):
362 self._reader, self._writer = Pipe(duplex=False)
363 self._rlock = Lock()
364 if sys.platform == 'win32':
365 self._wlock = None
366 else:
367 self._wlock = Lock()
368 self._make_methods()
369
370 def empty(self):
371 return not self._reader.poll()
372
373 def __getstate__(self):
374 assert_spawning(self)
375 return (self._reader, self._writer, self._rlock, self._wlock)
376
377 def __setstate__(self, state):
378 (self._reader, self._writer, self._rlock, self._wlock) = state
379 self._make_methods()
380
381 def _make_methods(self):
382 recv = self._reader.recv
383 racquire, rrelease = self._rlock.acquire, self._rlock.release
Antoine Pitroudd696492011-06-08 17:21:55 +0200384 def get(*, sentinels=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000385 racquire()
386 try:
Antoine Pitroudd696492011-06-08 17:21:55 +0200387 return recv(sentinels)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000388 finally:
389 rrelease()
390 self.get = get
391
392 if self._wlock is None:
393 # writes to a message oriented win32 pipe are atomic
394 self.put = self._writer.send
395 else:
396 send = self._writer.send
397 wacquire, wrelease = self._wlock.acquire, self._wlock.release
398 def put(obj):
399 wacquire()
400 try:
401 return send(obj)
402 finally:
403 wrelease()
404 self.put = put