blob: e3a1d5ed60e3b8686ce9fce40c7f12f2a585d517 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Queues"""
2
Victor Stinnereaf16ab2015-07-25 02:40:40 +02003__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
5import collections
6import heapq
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007
Victor Stinnereaf16ab2015-07-25 02:40:40 +02008from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009from . import events
10from . import futures
11from . import locks
Victor Stinnere6ecea52015-07-09 23:13:50 +020012from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070013
14
Guido van Rossumfef70982014-01-25 17:24:51 -080015class QueueEmpty(Exception):
Victor Stinner17d87f82015-02-03 15:09:24 +010016 """Exception raised when Queue.get_nowait() is called on a Queue object
17 which is empty.
18 """
Guido van Rossumfef70982014-01-25 17:24:51 -080019 pass
20
21
22class QueueFull(Exception):
Victor Stinner17d87f82015-02-03 15:09:24 +010023 """Exception raised when the Queue.put_nowait() method is called on a Queue
24 object which is full.
25 """
Guido van Rossumfef70982014-01-25 17:24:51 -080026 pass
27
28
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029class Queue:
30 """A queue, useful for coordinating producer and consumer coroutines.
31
32 If maxsize is less than or equal to zero, the queue size is infinite. If it
33 is an integer greater than 0, then "yield from put()" will block when the
34 queue reaches maxsize, until an item is removed by get().
35
36 Unlike the standard library Queue, you can reliably know this Queue's size
Victor Stinner2748bc72013-12-13 10:57:04 +010037 with qsize(), since your single-threaded asyncio application won't be
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038 interrupted between calling qsize() and doing an operation on the Queue.
39 """
40
41 def __init__(self, maxsize=0, *, loop=None):
42 if loop is None:
43 self._loop = events.get_event_loop()
44 else:
45 self._loop = loop
46 self._maxsize = maxsize
47
48 # Futures.
49 self._getters = collections.deque()
Guido van Rossum99f96c52015-09-28 07:42:34 -070050 # Futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051 self._putters = collections.deque()
Victor Stinner4cb814c2015-02-17 22:53:28 +010052 self._unfinished_tasks = 0
53 self._finished = locks.Event(loop=self._loop)
54 self._finished.set()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070055 self._init(maxsize)
56
Guido van Rossum0bd16bc2015-04-20 09:24:24 -070057 # These three are overridable in subclasses.
58
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070059 def _init(self, maxsize):
60 self._queue = collections.deque()
61
62 def _get(self):
63 return self._queue.popleft()
64
65 def _put(self, item):
66 self._queue.append(item)
Guido van Rossum0bd16bc2015-04-20 09:24:24 -070067
68 # End of the overridable methods.
69
Guido van Rossum99f96c52015-09-28 07:42:34 -070070 def _wakeup_next(self, waiters):
71 # Wake up the next waiter (if any) that isn't cancelled.
72 while waiters:
73 waiter = waiters.popleft()
74 if not waiter.done():
75 waiter.set_result(None)
76 break
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070077
78 def __repr__(self):
79 return '<{} at {:#x} {}>'.format(
80 type(self).__name__, id(self), self._format())
81
82 def __str__(self):
83 return '<{} {}>'.format(type(self).__name__, self._format())
84
85 def _format(self):
86 result = 'maxsize={!r}'.format(self._maxsize)
87 if getattr(self, '_queue', None):
88 result += ' _queue={!r}'.format(list(self._queue))
89 if self._getters:
90 result += ' _getters[{}]'.format(len(self._getters))
91 if self._putters:
92 result += ' _putters[{}]'.format(len(self._putters))
Victor Stinner4cb814c2015-02-17 22:53:28 +010093 if self._unfinished_tasks:
94 result += ' tasks={}'.format(self._unfinished_tasks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070095 return result
96
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070097 def qsize(self):
98 """Number of items in the queue."""
99 return len(self._queue)
100
101 @property
102 def maxsize(self):
103 """Number of items allowed in the queue."""
104 return self._maxsize
105
106 def empty(self):
107 """Return True if the queue is empty, False otherwise."""
108 return not self._queue
109
110 def full(self):
111 """Return True if there are maxsize items in the queue.
112
113 Note: if the Queue was initialized with maxsize=0 (the default),
114 then full() is never True.
115 """
116 if self._maxsize <= 0:
117 return False
118 else:
Victor Stinner66dc6b02014-06-17 23:36:21 +0200119 return self.qsize() >= self._maxsize
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700120
121 @coroutine
122 def put(self, item):
123 """Put an item into the queue.
124
Victor Stinner952ec982014-12-22 22:09:50 +0100125 Put an item into the queue. If the queue is full, wait until a free
126 slot is available before adding item.
127
128 This method is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700129 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700130 while self.full():
131 putter = futures.Future(loop=self._loop)
132 self._putters.append(putter)
133 try:
134 yield from putter
135 except:
136 putter.cancel() # Just in case putter is not done yet.
137 if not self.full() and not putter.cancelled():
138 # We were woken up by get_nowait(), but can't take
139 # the call. Wake up the next in line.
140 self._wakeup_next(self._putters)
141 raise
142 return self.put_nowait(item)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700143
144 def put_nowait(self, item):
145 """Put an item into the queue without blocking.
146
Guido van Rossumfef70982014-01-25 17:24:51 -0800147 If no free slot is immediately available, raise QueueFull.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700148 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700149 if self.full():
Guido van Rossumfef70982014-01-25 17:24:51 -0800150 raise QueueFull
Guido van Rossum99f96c52015-09-28 07:42:34 -0700151 self._put(item)
152 self._unfinished_tasks += 1
153 self._finished.clear()
154 self._wakeup_next(self._getters)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700155
156 @coroutine
157 def get(self):
158 """Remove and return an item from the queue.
159
Victor Stinner952ec982014-12-22 22:09:50 +0100160 If queue is empty, wait until an item is available.
161
162 This method is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700164 while self.empty():
165 getter = futures.Future(loop=self._loop)
166 self._getters.append(getter)
Yury Selivanov3fc0f2d2015-08-05 13:52:33 -0400167 try:
Guido van Rossum99f96c52015-09-28 07:42:34 -0700168 yield from getter
169 except:
170 getter.cancel() # Just in case getter is not done yet.
171 if not self.empty() and not getter.cancelled():
172 # We were woken up by put_nowait(), but can't take
173 # the call. Wake up the next in line.
174 self._wakeup_next(self._getters)
Yury Selivanov3fc0f2d2015-08-05 13:52:33 -0400175 raise
Guido van Rossum99f96c52015-09-28 07:42:34 -0700176 return self.get_nowait()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700177
178 def get_nowait(self):
179 """Remove and return an item from the queue.
180
Guido van Rossumfef70982014-01-25 17:24:51 -0800181 Return an item if one is immediately available, else raise QueueEmpty.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700182 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700183 if self.empty():
Guido van Rossumfef70982014-01-25 17:24:51 -0800184 raise QueueEmpty
Guido van Rossum99f96c52015-09-28 07:42:34 -0700185 item = self._get()
186 self._wakeup_next(self._putters)
187 return item
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700188
Victor Stinner4cb814c2015-02-17 22:53:28 +0100189 def task_done(self):
190 """Indicate that a formerly enqueued task is complete.
191
192 Used by queue consumers. For each get() used to fetch a task,
193 a subsequent call to task_done() tells the queue that the processing
194 on the task is complete.
195
196 If a join() is currently blocking, it will resume when all items have
197 been processed (meaning that a task_done() call was received for every
198 item that had been put() into the queue).
199
200 Raises ValueError if called more times than there were items placed in
201 the queue.
202 """
203 if self._unfinished_tasks <= 0:
204 raise ValueError('task_done() called too many times')
205 self._unfinished_tasks -= 1
206 if self._unfinished_tasks == 0:
207 self._finished.set()
208
209 @coroutine
210 def join(self):
211 """Block until all items in the queue have been gotten and processed.
212
213 The count of unfinished tasks goes up whenever an item is added to the
214 queue. The count goes down whenever a consumer calls task_done() to
215 indicate that the item was retrieved and all work on it is complete.
216 When the count of unfinished tasks drops to zero, join() unblocks.
217 """
218 if self._unfinished_tasks > 0:
219 yield from self._finished.wait()
220
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700221
222class PriorityQueue(Queue):
223 """A subclass of Queue; retrieves entries in priority order (lowest first).
224
225 Entries are typically tuples of the form: (priority number, data).
226 """
227
228 def _init(self, maxsize):
229 self._queue = []
230
231 def _put(self, item, heappush=heapq.heappush):
232 heappush(self._queue, item)
233
234 def _get(self, heappop=heapq.heappop):
235 return heappop(self._queue)
236
237
238class LifoQueue(Queue):
239 """A subclass of Queue that retrieves most recently added entries first."""
240
241 def _init(self, maxsize):
242 self._queue = []
243
244 def _put(self, item):
245 self._queue.append(item)
246
247 def _get(self):
248 return self._queue.pop()
249
250
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200251if not compat.PY35:
252 JoinableQueue = Queue
253 """Deprecated alias for Queue."""
254 __all__.append('JoinableQueue')