# | |
# Module implementing queues | |
# | |
# multiprocessing/queues.py | |
# | |
# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt | |
# | |
__all__ = ['Queue', 'SimpleQueue'] | |
import sys | |
import os | |
import threading | |
import collections | |
import time | |
import atexit | |
import weakref | |
from Queue import Empty, Full | |
import _multiprocessing | |
from multiprocessing import Pipe | |
from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition | |
from multiprocessing.util import debug, info, Finalize, register_after_fork | |
from multiprocessing.forking import assert_spawning | |
# | |
# Queue type using a pipe, buffer and thread | |
# | |
class Queue(object): | |
def __init__(self, maxsize=0): | |
if maxsize <= 0: | |
maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX | |
self._maxsize = maxsize | |
self._reader, self._writer = Pipe(duplex=False) | |
self._rlock = Lock() | |
self._opid = os.getpid() | |
if sys.platform == 'win32': | |
self._wlock = None | |
else: | |
self._wlock = Lock() | |
self._sem = BoundedSemaphore(maxsize) | |
self._after_fork() | |
if sys.platform != 'win32': | |
register_after_fork(self, Queue._after_fork) | |
def __getstate__(self): | |
assert_spawning(self) | |
return (self._maxsize, self._reader, self._writer, | |
self._rlock, self._wlock, self._sem, self._opid) | |
def __setstate__(self, state): | |
(self._maxsize, self._reader, self._writer, | |
self._rlock, self._wlock, self._sem, self._opid) = state | |
self._after_fork() | |
def _after_fork(self): | |
debug('Queue._after_fork()') | |
self._notempty = threading.Condition(threading.Lock()) | |
self._buffer = collections.deque() | |
self._thread = None | |
self._jointhread = None | |
self._joincancelled = False | |
self._closed = False | |
self._close = None | |
self._send = self._writer.send | |
self._recv = self._reader.recv | |
self._poll = self._reader.poll | |
def put(self, obj, block=True, timeout=None): | |
assert not self._closed | |
if not self._sem.acquire(block, timeout): | |
raise Full | |
self._notempty.acquire() | |
try: | |
if self._thread is None: | |
self._start_thread() | |
self._buffer.append(obj) | |
self._notempty.notify() | |
finally: | |
self._notempty.release() | |
def get(self, block=True, timeout=None): | |
if block and timeout is None: | |
self._rlock.acquire() | |
try: | |
res = self._recv() | |
self._sem.release() | |
return res | |
finally: | |
self._rlock.release() | |
else: | |
if block: | |
deadline = time.time() + timeout | |
if not self._rlock.acquire(block, timeout): | |
raise Empty | |
try: | |
if not self._poll(block and (deadline-time.time()) or 0.0): | |
raise Empty | |
res = self._recv() | |
self._sem.release() | |
return res | |
finally: | |
self._rlock.release() | |
def qsize(self): | |
# Raises NotImplementError on Mac OSX because of broken sem_getvalue() | |
return self._maxsize - self._sem._semlock._get_value() | |
def empty(self): | |
return not self._poll() | |
def full(self): | |
return self._sem._semlock._is_zero() | |
def get_nowait(self): | |
return self.get(False) | |
def put_nowait(self, obj): | |
return self.put(obj, False) | |
def close(self): | |
self._closed = True | |
self._reader.close() | |
if self._close: | |
self._close() | |
def join_thread(self): | |
debug('Queue.join_thread()') | |
assert self._closed | |
if self._jointhread: | |
self._jointhread() | |
def cancel_join_thread(self): | |
debug('Queue.cancel_join_thread()') | |
self._joincancelled = True | |
try: | |
self._jointhread.cancel() | |
except AttributeError: | |
pass | |
def _start_thread(self): | |
debug('Queue._start_thread()') | |
# Start thread which transfers data from buffer to pipe | |
self._buffer.clear() | |
self._thread = threading.Thread( | |
target=Queue._feed, | |
args=(self._buffer, self._notempty, self._send, | |
self._wlock, self._writer.close), | |
name='QueueFeederThread' | |
) | |
self._thread.setDaemon(True) | |
debug('doing self._thread.start()') | |
self._thread.start() | |
debug('... done self._thread.start()') | |
# On process exit we will wait for data to be flushed to pipe. | |
# | |
# However, if this process created the queue then all | |
# processes which use the queue will be descendants of this | |
# process. Therefore waiting for the queue to be flushed | |
# is pointless once all the child processes have been joined. | |
created_by_this_process = (self._opid == os.getpid()) | |
if not self._joincancelled and not created_by_this_process: | |
self._jointhread = Finalize( | |
self._thread, Queue._finalize_join, | |
[weakref.ref(self._thread)], | |
exitpriority=-5 | |
) | |
# Send sentinel to the thread queue object when garbage collected | |
self._close = Finalize( | |
self, Queue._finalize_close, | |
[self._buffer, self._notempty], | |
exitpriority=10 | |
) | |
@staticmethod | |
def _finalize_join(twr): | |
debug('joining queue thread') | |
thread = twr() | |
if thread is not None: | |
thread.join() | |
debug('... queue thread joined') | |
else: | |
debug('... queue thread already dead') | |
@staticmethod | |
def _finalize_close(buffer, notempty): | |
debug('telling queue thread to quit') | |
notempty.acquire() | |
try: | |
buffer.append(_sentinel) | |
notempty.notify() | |
finally: | |
notempty.release() | |
@staticmethod | |
def _feed(buffer, notempty, send, writelock, close): | |
debug('starting thread to feed data to pipe') | |
from .util import is_exiting | |
nacquire = notempty.acquire | |
nrelease = notempty.release | |
nwait = notempty.wait | |
bpopleft = buffer.popleft | |
sentinel = _sentinel | |
if sys.platform != 'win32': | |
wacquire = writelock.acquire | |
wrelease = writelock.release | |
else: | |
wacquire = None | |
try: | |
while 1: | |
nacquire() | |
try: | |
if not buffer: | |
nwait() | |
finally: | |
nrelease() | |
try: | |
while 1: | |
obj = bpopleft() | |
if obj is sentinel: | |
debug('feeder thread got sentinel -- exiting') | |
close() | |
return | |
if wacquire is None: | |
send(obj) | |
else: | |
wacquire() | |
try: | |
send(obj) | |
finally: | |
wrelease() | |
except IndexError: | |
pass | |
except Exception, e: | |
# Since this runs in a daemon thread the resources it uses | |
# may be become unusable while the process is cleaning up. | |
# We ignore errors which happen after the process has | |
# started to cleanup. | |
try: | |
if is_exiting(): | |
info('error in queue thread: %s', e) | |
else: | |
import traceback | |
traceback.print_exc() | |
except Exception: | |
pass | |
_sentinel = object() | |
# | |
# A queue type which also supports join() and task_done() methods | |
# | |
# Note that if you do not call task_done() for each finished task then | |
# eventually the counter's semaphore may overflow causing Bad Things | |
# to happen. | |
# | |
class JoinableQueue(Queue): | |
def __init__(self, maxsize=0): | |
Queue.__init__(self, maxsize) | |
self._unfinished_tasks = Semaphore(0) | |
self._cond = Condition() | |
def __getstate__(self): | |
return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks) | |
def __setstate__(self, state): | |
Queue.__setstate__(self, state[:-2]) | |
self._cond, self._unfinished_tasks = state[-2:] | |
def put(self, item, block=True, timeout=None): | |
Queue.put(self, item, block, timeout) | |
self._unfinished_tasks.release() | |
def task_done(self): | |
self._cond.acquire() | |
try: | |
if not self._unfinished_tasks.acquire(False): | |
raise ValueError('task_done() called too many times') | |
if self._unfinished_tasks._semlock._is_zero(): | |
self._cond.notify_all() | |
finally: | |
self._cond.release() | |
def join(self): | |
self._cond.acquire() | |
try: | |
if not self._unfinished_tasks._semlock._is_zero(): | |
self._cond.wait() | |
finally: | |
self._cond.release() | |
# | |
# Simplified Queue type -- really just a locked pipe | |
# | |
class SimpleQueue(object): | |
def __init__(self): | |
self._reader, self._writer = Pipe(duplex=False) | |
self._rlock = Lock() | |
if sys.platform == 'win32': | |
self._wlock = None | |
else: | |
self._wlock = Lock() | |
self._make_methods() | |
def empty(self): | |
return not self._reader.poll() | |
def __getstate__(self): | |
assert_spawning(self) | |
return (self._reader, self._writer, self._rlock, self._wlock) | |
def __setstate__(self, state): | |
(self._reader, self._writer, self._rlock, self._wlock) = state | |
self._make_methods() | |
def _make_methods(self): | |
recv = self._reader.recv | |
racquire, rrelease = self._rlock.acquire, self._rlock.release | |
def get(): | |
racquire() | |
try: | |
return recv() | |
finally: | |
rrelease() | |
self.get = get | |
if self._wlock is None: | |
# writes to a message oriented win32 pipe are atomic | |
self.put = self._writer.send | |
else: | |
send = self._writer.send | |
wacquire, wrelease = self._wlock.acquire, self._wlock.release | |
def put(obj): | |
wacquire() | |
try: | |
return send(obj) | |
finally: | |
wrelease() | |
self.put = put |