Benjamin Peterson | 190d56e | 2008-06-11 02:40:25 +0000 | [diff] [blame] | 1 | #
|
| 2 | # Module implementing queues
|
| 3 | #
|
| 4 | # multiprocessing/queues.py
|
| 5 | #
|
| 6 | # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
|
| 7 | #
|
| 8 |
|
| 9 | __all__ = ['Queue', 'SimpleQueue']
|
| 10 |
|
| 11 | import sys
|
| 12 | import os
|
| 13 | import threading
|
| 14 | import collections
|
| 15 | import time
|
| 16 | import atexit
|
| 17 | import weakref
|
| 18 |
|
| 19 | from Queue import Empty, Full
|
| 20 | import _multiprocessing
|
| 21 | from multiprocessing import Pipe
|
| 22 | from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
|
| 23 | from multiprocessing.util import debug, info, Finalize, register_after_fork
|
| 24 | from multiprocessing.forking import assert_spawning
|
| 25 |
|
| 26 | #
|
| 27 | # Queue type using a pipe, buffer and thread
|
| 28 | #
|
| 29 |
|
| 30 | class Queue(object):
|
| 31 |
|
| 32 | def __init__(self, maxsize=0):
|
| 33 | if maxsize <= 0:
|
| 34 | maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
|
| 35 | self._maxsize = maxsize
|
| 36 | self._reader, self._writer = Pipe(duplex=False)
|
| 37 | self._rlock = Lock()
|
| 38 | self._opid = os.getpid()
|
| 39 | if sys.platform == 'win32':
|
| 40 | self._wlock = None
|
| 41 | else:
|
| 42 | self._wlock = Lock()
|
| 43 | self._sem = BoundedSemaphore(maxsize)
|
Benjamin Peterson | dfd7949 | 2008-06-13 19:13:39 +0000 | [diff] [blame^] | 44 |
|
Benjamin Peterson | 190d56e | 2008-06-11 02:40:25 +0000 | [diff] [blame] | 45 | self._after_fork()
|
Benjamin Peterson | dfd7949 | 2008-06-13 19:13:39 +0000 | [diff] [blame^] | 46 |
|
Benjamin Peterson | 190d56e | 2008-06-11 02:40:25 +0000 | [diff] [blame] | 47 | if sys.platform != 'win32':
|
| 48 | register_after_fork(self, Queue._after_fork)
|
| 49 |
|
| 50 | def __getstate__(self):
|
| 51 | assert_spawning(self)
|
| 52 | return (self._maxsize, self._reader, self._writer,
|
| 53 | self._rlock, self._wlock, self._sem, self._opid)
|
Benjamin Peterson | dfd7949 | 2008-06-13 19:13:39 +0000 | [diff] [blame^] | 54 |
|
Benjamin Peterson | 190d56e | 2008-06-11 02:40:25 +0000 | [diff] [blame] | 55 | def __setstate__(self, state):
|
| 56 | (self._maxsize, self._reader, self._writer,
|
| 57 | self._rlock, self._wlock, self._sem, self._opid) = state
|
| 58 | self._after_fork()
|
Benjamin Peterson | dfd7949 | 2008-06-13 19:13:39 +0000 | [diff] [blame^] | 59 |
|
Benjamin Peterson | 190d56e | 2008-06-11 02:40:25 +0000 | [diff] [blame] | 60 | def _after_fork(self):
|
| 61 | debug('Queue._after_fork()')
|
| 62 | self._notempty = threading.Condition(threading.Lock())
|
| 63 | self._buffer = collections.deque()
|
| 64 | self._thread = None
|
| 65 | self._jointhread = None
|
| 66 | self._joincancelled = False
|
| 67 | self._closed = False
|
| 68 | self._close = None
|
| 69 | self._send = self._writer.send
|
| 70 | self._recv = self._reader.recv
|
| 71 | self._poll = self._reader.poll
|
Benjamin Peterson | dfd7949 | 2008-06-13 19:13:39 +0000 | [diff] [blame^] | 72 |
|
Benjamin Peterson | 190d56e | 2008-06-11 02:40:25 +0000 | [diff] [blame] | 73 | def put(self, obj, block=True, timeout=None):
|
| 74 | assert not self._closed
|
| 75 | if not self._sem.acquire(block, timeout):
|
| 76 | raise Full
|
| 77 |
|
| 78 | self._notempty.acquire()
|
| 79 | try:
|
| 80 | if self._thread is None:
|
| 81 | self._start_thread()
|
| 82 | self._buffer.append(obj)
|
| 83 | self._notempty.notify()
|
| 84 | finally:
|
| 85 | self._notempty.release()
|
| 86 |
|
| 87 | def get(self, block=True, timeout=None):
|
| 88 | if block and timeout is None:
|
| 89 | self._rlock.acquire()
|
| 90 | try:
|
| 91 | res = self._recv()
|
| 92 | self._sem.release()
|
| 93 | return res
|
| 94 | finally:
|
| 95 | self._rlock.release()
|
Benjamin Peterson | dfd7949 | 2008-06-13 19:13:39 +0000 | [diff] [blame^] | 96 |
|
Benjamin Peterson | 190d56e | 2008-06-11 02:40:25 +0000 | [diff] [blame] | 97 | else:
|
| 98 | if block:
|
| 99 | deadline = time.time() + timeout
|
| 100 | if not self._rlock.acquire(block, timeout):
|
| 101 | raise Empty
|
| 102 | try:
|
| 103 | if not self._poll(block and (deadline-time.time()) or 0.0):
|
| 104 | raise Empty
|
| 105 | res = self._recv()
|
| 106 | self._sem.release()
|
| 107 | return res
|
| 108 | finally:
|
| 109 | self._rlock.release()
|
| 110 |
|
| 111 | def qsize(self):
|
| 112 | # Raises NotImplementError on Mac OSX because of broken sem_getvalue()
|
| 113 | return self._maxsize - self._sem._semlock._get_value()
|
| 114 |
|
| 115 | def empty(self):
|
| 116 | return not self._poll()
|
| 117 |
|
| 118 | def full(self):
|
| 119 | return self._sem._semlock._is_zero()
|
| 120 |
|
| 121 | def get_nowait(self):
|
| 122 | return self.get(False)
|
| 123 |
|
| 124 | def put_nowait(self, obj):
|
| 125 | return self.put(obj, False)
|
| 126 |
|
| 127 | def close(self):
|
| 128 | self._closed = True
|
| 129 | self._reader.close()
|
| 130 | if self._close:
|
| 131 | self._close()
|
| 132 |
|
| 133 | def join_thread(self):
|
| 134 | debug('Queue.join_thread()')
|
| 135 | assert self._closed
|
| 136 | if self._jointhread:
|
| 137 | self._jointhread()
|
Benjamin Peterson | dfd7949 | 2008-06-13 19:13:39 +0000 | [diff] [blame^] | 138 |
|
Benjamin Peterson | 190d56e | 2008-06-11 02:40:25 +0000 | [diff] [blame] | 139 | def cancel_join_thread(self):
|
| 140 | debug('Queue.cancel_join_thread()')
|
| 141 | self._joincancelled = True
|
| 142 | try:
|
| 143 | self._jointhread.cancel()
|
| 144 | except AttributeError:
|
| 145 | pass
|
| 146 |
|
| 147 | def _start_thread(self):
|
| 148 | debug('Queue._start_thread()')
|
Benjamin Peterson | dfd7949 | 2008-06-13 19:13:39 +0000 | [diff] [blame^] | 149 |
|
Benjamin Peterson | 190d56e | 2008-06-11 02:40:25 +0000 | [diff] [blame] | 150 | # Start thread which transfers data from buffer to pipe
|
| 151 | self._buffer.clear()
|
| 152 | self._thread = threading.Thread(
|
| 153 | target=Queue._feed,
|
| 154 | args=(self._buffer, self._notempty, self._send,
|
| 155 | self._wlock, self._writer.close),
|
| 156 | name='QueueFeederThread'
|
| 157 | )
|
Benjamin Peterson | 0fbcf69 | 2008-06-11 17:27:50 +0000 | [diff] [blame] | 158 | self._thread.set_daemon(True)
|
Benjamin Peterson | 190d56e | 2008-06-11 02:40:25 +0000 | [diff] [blame] | 159 |
|
| 160 | debug('doing self._thread.start()')
|
| 161 | self._thread.start()
|
| 162 | debug('... done self._thread.start()')
|
| 163 |
|
| 164 | # On process exit we will wait for data to be flushed to pipe.
|
| 165 | #
|
| 166 | # However, if this process created the queue then all
|
| 167 | # processes which use the queue will be descendants of this
|
| 168 | # process. Therefore waiting for the queue to be flushed
|
| 169 | # is pointless once all the child processes have been joined.
|
| 170 | created_by_this_process = (self._opid == os.getpid())
|
| 171 | if not self._joincancelled and not created_by_this_process:
|
| 172 | self._jointhread = Finalize(
|
| 173 | self._thread, Queue._finalize_join,
|
| 174 | [weakref.ref(self._thread)],
|
| 175 | exitpriority=-5
|
| 176 | )
|
Benjamin Peterson | dfd7949 | 2008-06-13 19:13:39 +0000 | [diff] [blame^] | 177 |
|
Benjamin Peterson | 190d56e | 2008-06-11 02:40:25 +0000 | [diff] [blame] | 178 | # Send sentinel to the thread queue object when garbage collected
|
| 179 | self._close = Finalize(
|
| 180 | self, Queue._finalize_close,
|
| 181 | [self._buffer, self._notempty],
|
| 182 | exitpriority=10
|
| 183 | )
|
Benjamin Peterson | dfd7949 | 2008-06-13 19:13:39 +0000 | [diff] [blame^] | 184 |
|
Benjamin Peterson | 190d56e | 2008-06-11 02:40:25 +0000 | [diff] [blame] | 185 | @staticmethod
|
| 186 | def _finalize_join(twr):
|
| 187 | debug('joining queue thread')
|
| 188 | thread = twr()
|
| 189 | if thread is not None:
|
| 190 | thread.join()
|
| 191 | debug('... queue thread joined')
|
| 192 | else:
|
| 193 | debug('... queue thread already dead')
|
Benjamin Peterson | dfd7949 | 2008-06-13 19:13:39 +0000 | [diff] [blame^] | 194 |
|
Benjamin Peterson | 190d56e | 2008-06-11 02:40:25 +0000 | [diff] [blame] | 195 | @staticmethod
|
| 196 | def _finalize_close(buffer, notempty):
|
| 197 | debug('telling queue thread to quit')
|
| 198 | notempty.acquire()
|
| 199 | try:
|
| 200 | buffer.append(_sentinel)
|
| 201 | notempty.notify()
|
| 202 | finally:
|
| 203 | notempty.release()
|
| 204 |
|
| 205 | @staticmethod
|
| 206 | def _feed(buffer, notempty, send, writelock, close):
|
| 207 | debug('starting thread to feed data to pipe')
|
| 208 | from .util import is_exiting
|
Benjamin Peterson | dfd7949 | 2008-06-13 19:13:39 +0000 | [diff] [blame^] | 209 |
|
Benjamin Peterson | 190d56e | 2008-06-11 02:40:25 +0000 | [diff] [blame] | 210 | nacquire = notempty.acquire
|
| 211 | nrelease = notempty.release
|
| 212 | nwait = notempty.wait
|
| 213 | bpopleft = buffer.popleft
|
| 214 | sentinel = _sentinel
|
| 215 | if sys.platform != 'win32':
|
| 216 | wacquire = writelock.acquire
|
| 217 | wrelease = writelock.release
|
| 218 | else:
|
| 219 | wacquire = None
|
Benjamin Peterson | dfd7949 | 2008-06-13 19:13:39 +0000 | [diff] [blame^] | 220 |
|
Benjamin Peterson | 190d56e | 2008-06-11 02:40:25 +0000 | [diff] [blame] | 221 | try:
|
| 222 | while 1:
|
| 223 | nacquire()
|
| 224 | try:
|
| 225 | if not buffer:
|
| 226 | nwait()
|
| 227 | finally:
|
| 228 | nrelease()
|
| 229 | try:
|
| 230 | while 1:
|
| 231 | obj = bpopleft()
|
| 232 | if obj is sentinel:
|
| 233 | debug('feeder thread got sentinel -- exiting')
|
| 234 | close()
|
| 235 | return
|
| 236 |
|
| 237 | if wacquire is None:
|
| 238 | send(obj)
|
| 239 | else:
|
| 240 | wacquire()
|
| 241 | try:
|
| 242 | send(obj)
|
| 243 | finally:
|
| 244 | wrelease()
|
| 245 | except IndexError:
|
| 246 | pass
|
| 247 | except Exception, e:
|
| 248 | # Since this runs in a daemon thread the resources it uses
|
| 249 | # may be become unusable while the process is cleaning up.
|
| 250 | # We ignore errors which happen after the process has
|
| 251 | # started to cleanup.
|
| 252 | try:
|
| 253 | if is_exiting():
|
| 254 | info('error in queue thread: %s', e)
|
| 255 | else:
|
| 256 | import traceback
|
| 257 | traceback.print_exc()
|
| 258 | except Exception:
|
| 259 | pass
|
Benjamin Peterson | dfd7949 | 2008-06-13 19:13:39 +0000 | [diff] [blame^] | 260 |
|
Benjamin Peterson | 190d56e | 2008-06-11 02:40:25 +0000 | [diff] [blame] | 261 | _sentinel = object()
|
| 262 |
|
| 263 | #
|
| 264 | # A queue type which also supports join() and task_done() methods
|
| 265 | #
|
| 266 | # Note that if you do not call task_done() for each finished task then
|
| 267 | # eventually the counter's semaphore may overflow causing Bad Things
|
| 268 | # to happen.
|
| 269 | #
|
| 270 |
|
| 271 | class JoinableQueue(Queue):
|
| 272 |
|
| 273 | def __init__(self, maxsize=0):
|
| 274 | Queue.__init__(self, maxsize)
|
| 275 | self._unfinished_tasks = Semaphore(0)
|
| 276 | self._cond = Condition()
|
Benjamin Peterson | dfd7949 | 2008-06-13 19:13:39 +0000 | [diff] [blame^] | 277 |
|
Benjamin Peterson | 190d56e | 2008-06-11 02:40:25 +0000 | [diff] [blame] | 278 | def __getstate__(self):
|
| 279 | return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
|
| 280 |
|
| 281 | def __setstate__(self, state):
|
| 282 | Queue.__setstate__(self, state[:-2])
|
| 283 | self._cond, self._unfinished_tasks = state[-2:]
|
| 284 |
|
| 285 | def put(self, item, block=True, timeout=None):
|
| 286 | Queue.put(self, item, block, timeout)
|
| 287 | self._unfinished_tasks.release()
|
Benjamin Peterson | dfd7949 | 2008-06-13 19:13:39 +0000 | [diff] [blame^] | 288 |
|
Benjamin Peterson | 190d56e | 2008-06-11 02:40:25 +0000 | [diff] [blame] | 289 | def task_done(self):
|
| 290 | self._cond.acquire()
|
| 291 | try:
|
| 292 | if not self._unfinished_tasks.acquire(False):
|
| 293 | raise ValueError('task_done() called too many times')
|
| 294 | if self._unfinished_tasks._semlock._is_zero():
|
| 295 | self._cond.notify_all()
|
| 296 | finally:
|
| 297 | self._cond.release()
|
Benjamin Peterson | dfd7949 | 2008-06-13 19:13:39 +0000 | [diff] [blame^] | 298 |
|
Benjamin Peterson | 190d56e | 2008-06-11 02:40:25 +0000 | [diff] [blame] | 299 | def join(self):
|
| 300 | self._cond.acquire()
|
| 301 | try:
|
| 302 | if not self._unfinished_tasks._semlock._is_zero():
|
| 303 | self._cond.wait()
|
| 304 | finally:
|
| 305 | self._cond.release()
|
| 306 |
|
| 307 | #
|
| 308 | # Simplified Queue type -- really just a locked pipe
|
| 309 | #
|
| 310 |
|
| 311 | class SimpleQueue(object):
|
| 312 |
|
| 313 | def __init__(self):
|
| 314 | self._reader, self._writer = Pipe(duplex=False)
|
| 315 | self._rlock = Lock()
|
| 316 | if sys.platform == 'win32':
|
| 317 | self._wlock = None
|
| 318 | else:
|
| 319 | self._wlock = Lock()
|
| 320 | self._make_methods()
|
| 321 |
|
| 322 | def empty(self):
|
| 323 | return not self._reader.poll()
|
| 324 |
|
| 325 | def __getstate__(self):
|
| 326 | assert_spawning(self)
|
| 327 | return (self._reader, self._writer, self._rlock, self._wlock)
|
| 328 |
|
| 329 | def __setstate__(self, state):
|
| 330 | (self._reader, self._writer, self._rlock, self._wlock) = state
|
| 331 | self._make_methods()
|
| 332 |
|
| 333 | def _make_methods(self):
|
| 334 | recv = self._reader.recv
|
| 335 | racquire, rrelease = self._rlock.acquire, self._rlock.release
|
| 336 | def get():
|
| 337 | racquire()
|
| 338 | try:
|
| 339 | return recv()
|
| 340 | finally:
|
| 341 | rrelease()
|
| 342 | self.get = get
|
| 343 |
|
| 344 | if self._wlock is None:
|
| 345 | # writes to a message oriented win32 pipe are atomic
|
| 346 | self.put = self._writer.send
|
| 347 | else:
|
| 348 | send = self._writer.send
|
| 349 | wacquire, wrelease = self._wlock.acquire, self._wlock.release
|
| 350 | def put(obj):
|
| 351 | wacquire()
|
| 352 | try:
|
| 353 | return send(obj)
|
| 354 | finally:
|
| 355 | wrelease()
|
| 356 | self.put = put
|