blob: 10e694f13933f1651834a5521767cdd968f560c0 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Queues"""
2
Victor Stinnereaf16ab2015-07-25 02:40:40 +02003__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
5import collections
6import heapq
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007
8from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009from . import locks
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010
11
Guido van Rossumfef70982014-01-25 17:24:51 -080012class QueueEmpty(Exception):
Victor Stinner17d87f82015-02-03 15:09:24 +010013 """Exception raised when Queue.get_nowait() is called on a Queue object
14 which is empty.
15 """
Guido van Rossumfef70982014-01-25 17:24:51 -080016 pass
17
18
19class QueueFull(Exception):
Victor Stinner17d87f82015-02-03 15:09:24 +010020 """Exception raised when the Queue.put_nowait() method is called on a Queue
21 object which is full.
22 """
Guido van Rossumfef70982014-01-25 17:24:51 -080023 pass
24
25
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026class Queue:
27 """A queue, useful for coordinating producer and consumer coroutines.
28
29 If maxsize is less than or equal to zero, the queue size is infinite. If it
Andrew Svetlov5f841b52017-12-09 00:23:48 +020030 is an integer greater than 0, then "await put()" will block when the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031 queue reaches maxsize, until an item is removed by get().
32
33 Unlike the standard library Queue, you can reliably know this Queue's size
Victor Stinner2748bc72013-12-13 10:57:04 +010034 with qsize(), since your single-threaded asyncio application won't be
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070035 interrupted between calling qsize() and doing an operation on the Queue.
36 """
37
38 def __init__(self, maxsize=0, *, loop=None):
39 if loop is None:
40 self._loop = events.get_event_loop()
41 else:
42 self._loop = loop
43 self._maxsize = maxsize
44
45 # Futures.
46 self._getters = collections.deque()
Guido van Rossum99f96c52015-09-28 07:42:34 -070047 # Futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070048 self._putters = collections.deque()
Victor Stinner4cb814c2015-02-17 22:53:28 +010049 self._unfinished_tasks = 0
50 self._finished = locks.Event(loop=self._loop)
51 self._finished.set()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070052 self._init(maxsize)
53
Guido van Rossum0bd16bc2015-04-20 09:24:24 -070054 # These three are overridable in subclasses.
55
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070056 def _init(self, maxsize):
57 self._queue = collections.deque()
58
59 def _get(self):
60 return self._queue.popleft()
61
62 def _put(self, item):
63 self._queue.append(item)
Guido van Rossum0bd16bc2015-04-20 09:24:24 -070064
65 # End of the overridable methods.
66
Guido van Rossum99f96c52015-09-28 07:42:34 -070067 def _wakeup_next(self, waiters):
68 # Wake up the next waiter (if any) that isn't cancelled.
69 while waiters:
70 waiter = waiters.popleft()
71 if not waiter.done():
72 waiter.set_result(None)
73 break
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074
75 def __repr__(self):
76 return '<{} at {:#x} {}>'.format(
77 type(self).__name__, id(self), self._format())
78
79 def __str__(self):
80 return '<{} {}>'.format(type(self).__name__, self._format())
81
82 def _format(self):
83 result = 'maxsize={!r}'.format(self._maxsize)
84 if getattr(self, '_queue', None):
85 result += ' _queue={!r}'.format(list(self._queue))
86 if self._getters:
87 result += ' _getters[{}]'.format(len(self._getters))
88 if self._putters:
89 result += ' _putters[{}]'.format(len(self._putters))
Victor Stinner4cb814c2015-02-17 22:53:28 +010090 if self._unfinished_tasks:
91 result += ' tasks={}'.format(self._unfinished_tasks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070092 return result
93
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070094 def qsize(self):
95 """Number of items in the queue."""
96 return len(self._queue)
97
98 @property
99 def maxsize(self):
100 """Number of items allowed in the queue."""
101 return self._maxsize
102
103 def empty(self):
104 """Return True if the queue is empty, False otherwise."""
105 return not self._queue
106
107 def full(self):
108 """Return True if there are maxsize items in the queue.
109
110 Note: if the Queue was initialized with maxsize=0 (the default),
111 then full() is never True.
112 """
113 if self._maxsize <= 0:
114 return False
115 else:
Victor Stinner66dc6b02014-06-17 23:36:21 +0200116 return self.qsize() >= self._maxsize
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700117
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200118 async def put(self, item):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119 """Put an item into the queue.
120
Victor Stinner952ec982014-12-22 22:09:50 +0100121 Put an item into the queue. If the queue is full, wait until a free
122 slot is available before adding item.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700123 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700124 while self.full():
Yury Selivanov7661db62016-05-16 15:38:39 -0400125 putter = self._loop.create_future()
Guido van Rossum99f96c52015-09-28 07:42:34 -0700126 self._putters.append(putter)
127 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200128 await putter
Guido van Rossum99f96c52015-09-28 07:42:34 -0700129 except:
130 putter.cancel() # Just in case putter is not done yet.
131 if not self.full() and not putter.cancelled():
132 # We were woken up by get_nowait(), but can't take
133 # the call. Wake up the next in line.
134 self._wakeup_next(self._putters)
135 raise
136 return self.put_nowait(item)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700137
138 def put_nowait(self, item):
139 """Put an item into the queue without blocking.
140
Guido van Rossumfef70982014-01-25 17:24:51 -0800141 If no free slot is immediately available, raise QueueFull.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700142 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700143 if self.full():
Guido van Rossumfef70982014-01-25 17:24:51 -0800144 raise QueueFull
Guido van Rossum99f96c52015-09-28 07:42:34 -0700145 self._put(item)
146 self._unfinished_tasks += 1
147 self._finished.clear()
148 self._wakeup_next(self._getters)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700149
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200150 async def get(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700151 """Remove and return an item from the queue.
152
Victor Stinner952ec982014-12-22 22:09:50 +0100153 If queue is empty, wait until an item is available.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700154 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700155 while self.empty():
Yury Selivanov7661db62016-05-16 15:38:39 -0400156 getter = self._loop.create_future()
Guido van Rossum99f96c52015-09-28 07:42:34 -0700157 self._getters.append(getter)
Yury Selivanov3fc0f2d2015-08-05 13:52:33 -0400158 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200159 await getter
Guido van Rossum99f96c52015-09-28 07:42:34 -0700160 except:
161 getter.cancel() # Just in case getter is not done yet.
Suren Nihalanic62f0cb2017-11-07 09:35:23 -0800162
163 try:
164 self._getters.remove(getter)
165 except ValueError:
166 pass
167
Guido van Rossum99f96c52015-09-28 07:42:34 -0700168 if not self.empty() and not getter.cancelled():
169 # We were woken up by put_nowait(), but can't take
170 # the call. Wake up the next in line.
171 self._wakeup_next(self._getters)
Yury Selivanov3fc0f2d2015-08-05 13:52:33 -0400172 raise
Guido van Rossum99f96c52015-09-28 07:42:34 -0700173 return self.get_nowait()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700174
175 def get_nowait(self):
176 """Remove and return an item from the queue.
177
Guido van Rossumfef70982014-01-25 17:24:51 -0800178 Return an item if one is immediately available, else raise QueueEmpty.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700179 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700180 if self.empty():
Guido van Rossumfef70982014-01-25 17:24:51 -0800181 raise QueueEmpty
Guido van Rossum99f96c52015-09-28 07:42:34 -0700182 item = self._get()
183 self._wakeup_next(self._putters)
184 return item
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700185
Victor Stinner4cb814c2015-02-17 22:53:28 +0100186 def task_done(self):
187 """Indicate that a formerly enqueued task is complete.
188
189 Used by queue consumers. For each get() used to fetch a task,
190 a subsequent call to task_done() tells the queue that the processing
191 on the task is complete.
192
193 If a join() is currently blocking, it will resume when all items have
194 been processed (meaning that a task_done() call was received for every
195 item that had been put() into the queue).
196
197 Raises ValueError if called more times than there were items placed in
198 the queue.
199 """
200 if self._unfinished_tasks <= 0:
201 raise ValueError('task_done() called too many times')
202 self._unfinished_tasks -= 1
203 if self._unfinished_tasks == 0:
204 self._finished.set()
205
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200206 async def join(self):
Victor Stinner4cb814c2015-02-17 22:53:28 +0100207 """Block until all items in the queue have been gotten and processed.
208
209 The count of unfinished tasks goes up whenever an item is added to the
210 queue. The count goes down whenever a consumer calls task_done() to
211 indicate that the item was retrieved and all work on it is complete.
212 When the count of unfinished tasks drops to zero, join() unblocks.
213 """
214 if self._unfinished_tasks > 0:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200215 await self._finished.wait()
Victor Stinner4cb814c2015-02-17 22:53:28 +0100216
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217
218class PriorityQueue(Queue):
219 """A subclass of Queue; retrieves entries in priority order (lowest first).
220
221 Entries are typically tuples of the form: (priority number, data).
222 """
223
224 def _init(self, maxsize):
225 self._queue = []
226
227 def _put(self, item, heappush=heapq.heappush):
228 heappush(self._queue, item)
229
230 def _get(self, heappop=heapq.heappop):
231 return heappop(self._queue)
232
233
234class LifoQueue(Queue):
235 """A subclass of Queue that retrieves most recently added entries first."""
236
237 def _init(self, maxsize):
238 self._queue = []
239
240 def _put(self, item):
241 self._queue.append(item)
242
243 def _get(self):
244 return self._queue.pop()