blob: 67ac49cf6160762029e9c88b71c19813b7b8e61f [file] [log] [blame]
Benjamin Peterson7f03ea72008-06-13 19:20:48 +00001#
2# Module implementing queues
3#
4# multiprocessing/queues.py
5#
6# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
7#
8
Jesse Nollerb2898e02009-03-31 03:31:16 +00009__all__ = ['Queue', 'SimpleQueue', 'JoinableQueue']
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000010
11import sys
12import os
13import threading
14import collections
15import time
16import atexit
17import weakref
18
19from Queue import Empty, Full
20import _multiprocessing
21from multiprocessing import Pipe
22from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
23from multiprocessing.util import debug, info, Finalize, register_after_fork
24from multiprocessing.forking import assert_spawning
25
26#
27# Queue type using a pipe, buffer and thread
28#
29
30class Queue(object):
31
32 def __init__(self, maxsize=0):
33 if maxsize <= 0:
34 maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
35 self._maxsize = maxsize
36 self._reader, self._writer = Pipe(duplex=False)
37 self._rlock = Lock()
38 self._opid = os.getpid()
39 if sys.platform == 'win32':
40 self._wlock = None
41 else:
42 self._wlock = Lock()
43 self._sem = BoundedSemaphore(maxsize)
44
45 self._after_fork()
46
47 if sys.platform != 'win32':
48 register_after_fork(self, Queue._after_fork)
49
Jesse Noller6c376742009-11-21 14:01:56 +000050 self.getv = 0
51
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000052 def __getstate__(self):
53 assert_spawning(self)
54 return (self._maxsize, self._reader, self._writer,
55 self._rlock, self._wlock, self._sem, self._opid)
56
57 def __setstate__(self, state):
58 (self._maxsize, self._reader, self._writer,
59 self._rlock, self._wlock, self._sem, self._opid) = state
60 self._after_fork()
61
62 def _after_fork(self):
63 debug('Queue._after_fork()')
64 self._notempty = threading.Condition(threading.Lock())
65 self._buffer = collections.deque()
66 self._thread = None
67 self._jointhread = None
68 self._joincancelled = False
69 self._closed = False
70 self._close = None
71 self._send = self._writer.send
72 self._recv = self._reader.recv
73 self._poll = self._reader.poll
74
75 def put(self, obj, block=True, timeout=None):
Jesse Noller6c376742009-11-21 14:01:56 +000076 if not isinstance(obj, list):
77 debug('put: %s', obj)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000078 assert not self._closed
79 if not self._sem.acquire(block, timeout):
80 raise Full
81
82 self._notempty.acquire()
83 try:
84 if self._thread is None:
85 self._start_thread()
86 self._buffer.append(obj)
87 self._notempty.notify()
88 finally:
89 self._notempty.release()
90
91 def get(self, block=True, timeout=None):
Jesse Noller6c376742009-11-21 14:01:56 +000092 self.getv += 1
93 debug('self.getv: %s', self.getv)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000094 if block and timeout is None:
95 self._rlock.acquire()
96 try:
97 res = self._recv()
98 self._sem.release()
Jesse Noller6c376742009-11-21 14:01:56 +000099 if not isinstance(res, list):
100 debug('get: %s', res)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000101 return res
102 finally:
103 self._rlock.release()
104
105 else:
106 if block:
107 deadline = time.time() + timeout
108 if not self._rlock.acquire(block, timeout):
109 raise Empty
110 try:
111 if not self._poll(block and (deadline-time.time()) or 0.0):
112 raise Empty
113 res = self._recv()
114 self._sem.release()
Jesse Noller6c376742009-11-21 14:01:56 +0000115 if not isinstance(res, list):
116 debug('get: %s', res)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000117 return res
118 finally:
119 self._rlock.release()
120
121 def qsize(self):
Georg Brandl0c6d1662009-06-08 18:41:36 +0000122 # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000123 return self._maxsize - self._sem._semlock._get_value()
124
125 def empty(self):
126 return not self._poll()
127
128 def full(self):
129 return self._sem._semlock._is_zero()
130
131 def get_nowait(self):
132 return self.get(False)
133
134 def put_nowait(self, obj):
135 return self.put(obj, False)
136
137 def close(self):
138 self._closed = True
139 self._reader.close()
140 if self._close:
141 self._close()
142
143 def join_thread(self):
144 debug('Queue.join_thread()')
145 assert self._closed
146 if self._jointhread:
147 self._jointhread()
148
149 def cancel_join_thread(self):
150 debug('Queue.cancel_join_thread()')
151 self._joincancelled = True
152 try:
153 self._jointhread.cancel()
154 except AttributeError:
155 pass
156
157 def _start_thread(self):
158 debug('Queue._start_thread()')
159
160 # Start thread which transfers data from buffer to pipe
161 self._buffer.clear()
162 self._thread = threading.Thread(
163 target=Queue._feed,
164 args=(self._buffer, self._notempty, self._send,
165 self._wlock, self._writer.close),
166 name='QueueFeederThread'
167 )
Benjamin Petersona9b22222008-08-18 18:01:43 +0000168 self._thread.daemon = True
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000169
170 debug('doing self._thread.start()')
171 self._thread.start()
172 debug('... done self._thread.start()')
173
174 # On process exit we will wait for data to be flushed to pipe.
175 #
176 # However, if this process created the queue then all
177 # processes which use the queue will be descendants of this
178 # process. Therefore waiting for the queue to be flushed
179 # is pointless once all the child processes have been joined.
180 created_by_this_process = (self._opid == os.getpid())
181 if not self._joincancelled and not created_by_this_process:
182 self._jointhread = Finalize(
183 self._thread, Queue._finalize_join,
184 [weakref.ref(self._thread)],
185 exitpriority=-5
186 )
187
188 # Send sentinel to the thread queue object when garbage collected
189 self._close = Finalize(
190 self, Queue._finalize_close,
191 [self._buffer, self._notempty],
192 exitpriority=10
193 )
194
195 @staticmethod
196 def _finalize_join(twr):
197 debug('joining queue thread')
198 thread = twr()
199 if thread is not None:
200 thread.join()
201 debug('... queue thread joined')
202 else:
203 debug('... queue thread already dead')
204
205 @staticmethod
206 def _finalize_close(buffer, notempty):
207 debug('telling queue thread to quit')
208 notempty.acquire()
209 try:
210 buffer.append(_sentinel)
211 notempty.notify()
212 finally:
213 notempty.release()
214
215 @staticmethod
216 def _feed(buffer, notempty, send, writelock, close):
217 debug('starting thread to feed data to pipe')
218 from .util import is_exiting
219
220 nacquire = notempty.acquire
221 nrelease = notempty.release
222 nwait = notempty.wait
223 bpopleft = buffer.popleft
224 sentinel = _sentinel
225 if sys.platform != 'win32':
226 wacquire = writelock.acquire
227 wrelease = writelock.release
228 else:
229 wacquire = None
230
231 try:
232 while 1:
233 nacquire()
234 try:
235 if not buffer:
236 nwait()
237 finally:
238 nrelease()
239 try:
240 while 1:
241 obj = bpopleft()
Jesse Noller6c376742009-11-21 14:01:56 +0000242 if not isinstance(obj, list):
243 debug('feeder thread got: %s', obj)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000244 if obj is sentinel:
245 debug('feeder thread got sentinel -- exiting')
246 close()
247 return
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000248 if wacquire is None:
Jesse Noller6c376742009-11-21 14:01:56 +0000249 if not isinstance(obj, list):
250 debug('sending to pipe: %s', obj)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000251 send(obj)
252 else:
Jesse Noller6c376742009-11-21 14:01:56 +0000253 debug('waiting on wacquire')
254 wacquire(timeout=30)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000255 try:
Jesse Noller6c376742009-11-21 14:01:56 +0000256 if not isinstance(obj, list):
257 debug('sending to pipe: %s', obj)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000258 send(obj)
259 finally:
260 wrelease()
261 except IndexError:
262 pass
263 except Exception, e:
264 # Since this runs in a daemon thread the resources it uses
265 # may be become unusable while the process is cleaning up.
266 # We ignore errors which happen after the process has
267 # started to cleanup.
268 try:
269 if is_exiting():
270 info('error in queue thread: %s', e)
271 else:
272 import traceback
273 traceback.print_exc()
274 except Exception:
275 pass
276
277_sentinel = object()
278
279#
280# A queue type which also supports join() and task_done() methods
281#
282# Note that if you do not call task_done() for each finished task then
283# eventually the counter's semaphore may overflow causing Bad Things
284# to happen.
285#
286
287class JoinableQueue(Queue):
288
289 def __init__(self, maxsize=0):
290 Queue.__init__(self, maxsize)
291 self._unfinished_tasks = Semaphore(0)
292 self._cond = Condition()
293
294 def __getstate__(self):
295 return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
296
297 def __setstate__(self, state):
298 Queue.__setstate__(self, state[:-2])
299 self._cond, self._unfinished_tasks = state[-2:]
300
Jesse Noller8497efe2009-08-06 02:05:56 +0000301 def put(self, obj, block=True, timeout=None):
302 assert not self._closed
303 if not self._sem.acquire(block, timeout):
304 raise Full
305
306 self._notempty.acquire()
307 self._cond.acquire()
308 try:
309 if self._thread is None:
310 self._start_thread()
311 self._buffer.append(obj)
312 self._unfinished_tasks.release()
313 self._notempty.notify()
314 finally:
315 self._cond.release()
316 self._notempty.release()
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000317
318 def task_done(self):
319 self._cond.acquire()
320 try:
321 if not self._unfinished_tasks.acquire(False):
322 raise ValueError('task_done() called too many times')
323 if self._unfinished_tasks._semlock._is_zero():
324 self._cond.notify_all()
325 finally:
326 self._cond.release()
327
328 def join(self):
329 self._cond.acquire()
330 try:
331 if not self._unfinished_tasks._semlock._is_zero():
332 self._cond.wait()
333 finally:
334 self._cond.release()
335
336#
337# Simplified Queue type -- really just a locked pipe
338#
339
340class SimpleQueue(object):
341
342 def __init__(self):
343 self._reader, self._writer = Pipe(duplex=False)
344 self._rlock = Lock()
345 if sys.platform == 'win32':
346 self._wlock = None
347 else:
348 self._wlock = Lock()
349 self._make_methods()
350
351 def empty(self):
352 return not self._reader.poll()
353
354 def __getstate__(self):
355 assert_spawning(self)
356 return (self._reader, self._writer, self._rlock, self._wlock)
357
358 def __setstate__(self, state):
359 (self._reader, self._writer, self._rlock, self._wlock) = state
360 self._make_methods()
361
362 def _make_methods(self):
363 recv = self._reader.recv
364 racquire, rrelease = self._rlock.acquire, self._rlock.release
365 def get():
366 racquire()
367 try:
368 return recv()
369 finally:
370 rrelease()
371 self.get = get
372
373 if self._wlock is None:
374 # writes to a message oriented win32 pipe are atomic
375 self.put = self._writer.send
376 else:
377 send = self._writer.send
378 wacquire, wrelease = self._wlock.acquire, self._wlock.release
379 def put(obj):
380 wacquire()
381 try:
382 return send(obj)
383 finally:
384 wrelease()
385 self.put = put