blob: 390ae9a6821c4d4e3790458c1d5615f396b4fa14 [file] [log] [blame]
Yury Selivanov6370f342017-12-10 18:36:12 -05001__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
3import collections
4import heapq
Emmanuel Arias9008be32019-09-10 08:46:12 -03005import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006
7from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008from . import locks
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009
10
Guido van Rossumfef70982014-01-25 17:24:51 -080011class QueueEmpty(Exception):
Yury Selivanov6370f342017-12-10 18:36:12 -050012 """Raised when Queue.get_nowait() is called on an empty Queue."""
Guido van Rossumfef70982014-01-25 17:24:51 -080013 pass
14
15
16class QueueFull(Exception):
Yury Selivanov6370f342017-12-10 18:36:12 -050017 """Raised when the Queue.put_nowait() method is called on a full Queue."""
Guido van Rossumfef70982014-01-25 17:24:51 -080018 pass
19
20
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021class Queue:
22 """A queue, useful for coordinating producer and consumer coroutines.
23
24 If maxsize is less than or equal to zero, the queue size is infinite. If it
Andrew Svetlov5f841b52017-12-09 00:23:48 +020025 is an integer greater than 0, then "await put()" will block when the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026 queue reaches maxsize, until an item is removed by get().
27
28 Unlike the standard library Queue, you can reliably know this Queue's size
Victor Stinner2748bc72013-12-13 10:57:04 +010029 with qsize(), since your single-threaded asyncio application won't be
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030 interrupted between calling qsize() and doing an operation on the Queue.
31 """
32
33 def __init__(self, maxsize=0, *, loop=None):
34 if loop is None:
35 self._loop = events.get_event_loop()
36 else:
37 self._loop = loop
Emmanuel Arias9008be32019-09-10 08:46:12 -030038 warnings.warn("The loop argument is deprecated since Python 3.8, "
39 "and scheduled for removal in Python 3.10.",
40 DeprecationWarning, stacklevel=2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041 self._maxsize = maxsize
42
43 # Futures.
44 self._getters = collections.deque()
Guido van Rossum99f96c52015-09-28 07:42:34 -070045 # Futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046 self._putters = collections.deque()
Victor Stinner4cb814c2015-02-17 22:53:28 +010047 self._unfinished_tasks = 0
Andrew Svetlov7264e922019-09-11 11:20:24 +030048 self._finished = locks.Event(loop=loop)
Victor Stinner4cb814c2015-02-17 22:53:28 +010049 self._finished.set()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050 self._init(maxsize)
51
Guido van Rossum0bd16bc2015-04-20 09:24:24 -070052 # These three are overridable in subclasses.
53
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070054 def _init(self, maxsize):
55 self._queue = collections.deque()
56
57 def _get(self):
58 return self._queue.popleft()
59
60 def _put(self, item):
61 self._queue.append(item)
Guido van Rossum0bd16bc2015-04-20 09:24:24 -070062
63 # End of the overridable methods.
64
Guido van Rossum99f96c52015-09-28 07:42:34 -070065 def _wakeup_next(self, waiters):
66 # Wake up the next waiter (if any) that isn't cancelled.
67 while waiters:
68 waiter = waiters.popleft()
69 if not waiter.done():
70 waiter.set_result(None)
71 break
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070072
73 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -050074 return f'<{type(self).__name__} at {id(self):#x} {self._format()}>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070075
76 def __str__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -050077 return f'<{type(self).__name__} {self._format()}>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070078
79 def _format(self):
Yury Selivanov6370f342017-12-10 18:36:12 -050080 result = f'maxsize={self._maxsize!r}'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070081 if getattr(self, '_queue', None):
Yury Selivanov6370f342017-12-10 18:36:12 -050082 result += f' _queue={list(self._queue)!r}'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070083 if self._getters:
Yury Selivanov6370f342017-12-10 18:36:12 -050084 result += f' _getters[{len(self._getters)}]'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070085 if self._putters:
Yury Selivanov6370f342017-12-10 18:36:12 -050086 result += f' _putters[{len(self._putters)}]'
Victor Stinner4cb814c2015-02-17 22:53:28 +010087 if self._unfinished_tasks:
Yury Selivanov6370f342017-12-10 18:36:12 -050088 result += f' tasks={self._unfinished_tasks}'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070089 return result
90
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070091 def qsize(self):
92 """Number of items in the queue."""
93 return len(self._queue)
94
95 @property
96 def maxsize(self):
97 """Number of items allowed in the queue."""
98 return self._maxsize
99
100 def empty(self):
101 """Return True if the queue is empty, False otherwise."""
102 return not self._queue
103
104 def full(self):
105 """Return True if there are maxsize items in the queue.
106
107 Note: if the Queue was initialized with maxsize=0 (the default),
108 then full() is never True.
109 """
110 if self._maxsize <= 0:
111 return False
112 else:
Victor Stinner66dc6b02014-06-17 23:36:21 +0200113 return self.qsize() >= self._maxsize
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700114
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200115 async def put(self, item):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700116 """Put an item into the queue.
117
Victor Stinner952ec982014-12-22 22:09:50 +0100118 Put an item into the queue. If the queue is full, wait until a free
119 slot is available before adding item.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700120 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700121 while self.full():
Yury Selivanov7661db62016-05-16 15:38:39 -0400122 putter = self._loop.create_future()
Guido van Rossum99f96c52015-09-28 07:42:34 -0700123 self._putters.append(putter)
124 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200125 await putter
Guido van Rossum99f96c52015-09-28 07:42:34 -0700126 except:
127 putter.cancel() # Just in case putter is not done yet.
José Melero Fernándezc47dacb2018-01-25 23:45:43 +0000128 try:
129 # Clean self._putters from canceled putters.
130 self._putters.remove(putter)
131 except ValueError:
132 # The putter could be removed from self._putters by a
133 # previous get_nowait call.
134 pass
Guido van Rossum99f96c52015-09-28 07:42:34 -0700135 if not self.full() and not putter.cancelled():
136 # We were woken up by get_nowait(), but can't take
137 # the call. Wake up the next in line.
138 self._wakeup_next(self._putters)
139 raise
140 return self.put_nowait(item)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700141
142 def put_nowait(self, item):
143 """Put an item into the queue without blocking.
144
Guido van Rossumfef70982014-01-25 17:24:51 -0800145 If no free slot is immediately available, raise QueueFull.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700146 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700147 if self.full():
Guido van Rossumfef70982014-01-25 17:24:51 -0800148 raise QueueFull
Guido van Rossum99f96c52015-09-28 07:42:34 -0700149 self._put(item)
150 self._unfinished_tasks += 1
151 self._finished.clear()
152 self._wakeup_next(self._getters)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700153
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200154 async def get(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700155 """Remove and return an item from the queue.
156
Victor Stinner952ec982014-12-22 22:09:50 +0100157 If queue is empty, wait until an item is available.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700158 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700159 while self.empty():
Yury Selivanov7661db62016-05-16 15:38:39 -0400160 getter = self._loop.create_future()
Guido van Rossum99f96c52015-09-28 07:42:34 -0700161 self._getters.append(getter)
Yury Selivanov3fc0f2d2015-08-05 13:52:33 -0400162 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200163 await getter
Guido van Rossum99f96c52015-09-28 07:42:34 -0700164 except:
165 getter.cancel() # Just in case getter is not done yet.
Suren Nihalanic62f0cb2017-11-07 09:35:23 -0800166 try:
José Melero Fernándezc47dacb2018-01-25 23:45:43 +0000167 # Clean self._getters from canceled getters.
Suren Nihalanic62f0cb2017-11-07 09:35:23 -0800168 self._getters.remove(getter)
169 except ValueError:
José Melero Fernándezc47dacb2018-01-25 23:45:43 +0000170 # The getter could be removed from self._getters by a
171 # previous put_nowait call.
Suren Nihalanic62f0cb2017-11-07 09:35:23 -0800172 pass
Guido van Rossum99f96c52015-09-28 07:42:34 -0700173 if not self.empty() and not getter.cancelled():
174 # We were woken up by put_nowait(), but can't take
175 # the call. Wake up the next in line.
176 self._wakeup_next(self._getters)
Yury Selivanov3fc0f2d2015-08-05 13:52:33 -0400177 raise
Guido van Rossum99f96c52015-09-28 07:42:34 -0700178 return self.get_nowait()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700179
180 def get_nowait(self):
181 """Remove and return an item from the queue.
182
Guido van Rossumfef70982014-01-25 17:24:51 -0800183 Return an item if one is immediately available, else raise QueueEmpty.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700184 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700185 if self.empty():
Guido van Rossumfef70982014-01-25 17:24:51 -0800186 raise QueueEmpty
Guido van Rossum99f96c52015-09-28 07:42:34 -0700187 item = self._get()
188 self._wakeup_next(self._putters)
189 return item
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700190
Victor Stinner4cb814c2015-02-17 22:53:28 +0100191 def task_done(self):
192 """Indicate that a formerly enqueued task is complete.
193
194 Used by queue consumers. For each get() used to fetch a task,
195 a subsequent call to task_done() tells the queue that the processing
196 on the task is complete.
197
198 If a join() is currently blocking, it will resume when all items have
199 been processed (meaning that a task_done() call was received for every
200 item that had been put() into the queue).
201
202 Raises ValueError if called more times than there were items placed in
203 the queue.
204 """
205 if self._unfinished_tasks <= 0:
206 raise ValueError('task_done() called too many times')
207 self._unfinished_tasks -= 1
208 if self._unfinished_tasks == 0:
209 self._finished.set()
210
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200211 async def join(self):
Victor Stinner4cb814c2015-02-17 22:53:28 +0100212 """Block until all items in the queue have been gotten and processed.
213
214 The count of unfinished tasks goes up whenever an item is added to the
215 queue. The count goes down whenever a consumer calls task_done() to
216 indicate that the item was retrieved and all work on it is complete.
217 When the count of unfinished tasks drops to zero, join() unblocks.
218 """
219 if self._unfinished_tasks > 0:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200220 await self._finished.wait()
Victor Stinner4cb814c2015-02-17 22:53:28 +0100221
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700222
223class PriorityQueue(Queue):
224 """A subclass of Queue; retrieves entries in priority order (lowest first).
225
226 Entries are typically tuples of the form: (priority number, data).
227 """
228
229 def _init(self, maxsize):
230 self._queue = []
231
232 def _put(self, item, heappush=heapq.heappush):
233 heappush(self._queue, item)
234
235 def _get(self, heappop=heapq.heappop):
236 return heappop(self._queue)
237
238
239class LifoQueue(Queue):
240 """A subclass of Queue that retrieves most recently added entries first."""
241
242 def _init(self, maxsize):
243 self._queue = []
244
245 def _put(self, item):
246 self._queue.append(item)
247
248 def _get(self):
249 return self._queue.pop()