blob: cd3f7c6a56789152052a2d00143b852bca384e0e [file] [log] [blame]
Yury Selivanov6370f342017-12-10 18:36:12 -05001__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
3import collections
4import heapq
Emmanuel Arias9008be32019-09-10 08:46:12 -03005import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006
7from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008from . import locks
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009
10
Guido van Rossumfef70982014-01-25 17:24:51 -080011class QueueEmpty(Exception):
Yury Selivanov6370f342017-12-10 18:36:12 -050012 """Raised when Queue.get_nowait() is called on an empty Queue."""
Guido van Rossumfef70982014-01-25 17:24:51 -080013 pass
14
15
16class QueueFull(Exception):
Yury Selivanov6370f342017-12-10 18:36:12 -050017 """Raised when the Queue.put_nowait() method is called on a full Queue."""
Guido van Rossumfef70982014-01-25 17:24:51 -080018 pass
19
20
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021class Queue:
22 """A queue, useful for coordinating producer and consumer coroutines.
23
24 If maxsize is less than or equal to zero, the queue size is infinite. If it
Andrew Svetlov5f841b52017-12-09 00:23:48 +020025 is an integer greater than 0, then "await put()" will block when the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026 queue reaches maxsize, until an item is removed by get().
27
28 Unlike the standard library Queue, you can reliably know this Queue's size
Victor Stinner2748bc72013-12-13 10:57:04 +010029 with qsize(), since your single-threaded asyncio application won't be
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030 interrupted between calling qsize() and doing an operation on the Queue.
31 """
32
33 def __init__(self, maxsize=0, *, loop=None):
34 if loop is None:
35 self._loop = events.get_event_loop()
36 else:
37 self._loop = loop
Emmanuel Arias9008be32019-09-10 08:46:12 -030038 warnings.warn("The loop argument is deprecated since Python 3.8, "
39 "and scheduled for removal in Python 3.10.",
40 DeprecationWarning, stacklevel=2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041 self._maxsize = maxsize
42
43 # Futures.
44 self._getters = collections.deque()
Guido van Rossum99f96c52015-09-28 07:42:34 -070045 # Futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046 self._putters = collections.deque()
Victor Stinner4cb814c2015-02-17 22:53:28 +010047 self._unfinished_tasks = 0
Andrew Svetlov7264e922019-09-11 11:20:24 +030048 self._finished = locks.Event(loop=loop)
Victor Stinner4cb814c2015-02-17 22:53:28 +010049 self._finished.set()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050 self._init(maxsize)
51
Guido van Rossum0bd16bc2015-04-20 09:24:24 -070052 # These three are overridable in subclasses.
53
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070054 def _init(self, maxsize):
55 self._queue = collections.deque()
56
57 def _get(self):
58 return self._queue.popleft()
59
60 def _put(self, item):
61 self._queue.append(item)
Guido van Rossum0bd16bc2015-04-20 09:24:24 -070062
63 # End of the overridable methods.
64
Guido van Rossum99f96c52015-09-28 07:42:34 -070065 def _wakeup_next(self, waiters):
66 # Wake up the next waiter (if any) that isn't cancelled.
67 while waiters:
68 waiter = waiters.popleft()
69 if not waiter.done():
70 waiter.set_result(None)
71 break
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070072
73 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -050074 return f'<{type(self).__name__} at {id(self):#x} {self._format()}>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070075
76 def __str__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -050077 return f'<{type(self).__name__} {self._format()}>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070078
Batuhan Taşkayadec36722019-12-07 14:05:07 +030079 def __class_getitem__(cls, type):
80 return cls
81
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070082 def _format(self):
Yury Selivanov6370f342017-12-10 18:36:12 -050083 result = f'maxsize={self._maxsize!r}'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070084 if getattr(self, '_queue', None):
Yury Selivanov6370f342017-12-10 18:36:12 -050085 result += f' _queue={list(self._queue)!r}'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086 if self._getters:
Yury Selivanov6370f342017-12-10 18:36:12 -050087 result += f' _getters[{len(self._getters)}]'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070088 if self._putters:
Yury Selivanov6370f342017-12-10 18:36:12 -050089 result += f' _putters[{len(self._putters)}]'
Victor Stinner4cb814c2015-02-17 22:53:28 +010090 if self._unfinished_tasks:
Yury Selivanov6370f342017-12-10 18:36:12 -050091 result += f' tasks={self._unfinished_tasks}'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070092 return result
93
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070094 def qsize(self):
95 """Number of items in the queue."""
96 return len(self._queue)
97
98 @property
99 def maxsize(self):
100 """Number of items allowed in the queue."""
101 return self._maxsize
102
103 def empty(self):
104 """Return True if the queue is empty, False otherwise."""
105 return not self._queue
106
107 def full(self):
108 """Return True if there are maxsize items in the queue.
109
110 Note: if the Queue was initialized with maxsize=0 (the default),
111 then full() is never True.
112 """
113 if self._maxsize <= 0:
114 return False
115 else:
Victor Stinner66dc6b02014-06-17 23:36:21 +0200116 return self.qsize() >= self._maxsize
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700117
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200118 async def put(self, item):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119 """Put an item into the queue.
120
Victor Stinner952ec982014-12-22 22:09:50 +0100121 Put an item into the queue. If the queue is full, wait until a free
122 slot is available before adding item.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700123 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700124 while self.full():
Yury Selivanov7661db62016-05-16 15:38:39 -0400125 putter = self._loop.create_future()
Guido van Rossum99f96c52015-09-28 07:42:34 -0700126 self._putters.append(putter)
127 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200128 await putter
Guido van Rossum99f96c52015-09-28 07:42:34 -0700129 except:
130 putter.cancel() # Just in case putter is not done yet.
José Melero Fernándezc47dacb2018-01-25 23:45:43 +0000131 try:
132 # Clean self._putters from canceled putters.
133 self._putters.remove(putter)
134 except ValueError:
135 # The putter could be removed from self._putters by a
136 # previous get_nowait call.
137 pass
Guido van Rossum99f96c52015-09-28 07:42:34 -0700138 if not self.full() and not putter.cancelled():
139 # We were woken up by get_nowait(), but can't take
140 # the call. Wake up the next in line.
141 self._wakeup_next(self._putters)
142 raise
143 return self.put_nowait(item)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700144
145 def put_nowait(self, item):
146 """Put an item into the queue without blocking.
147
Guido van Rossumfef70982014-01-25 17:24:51 -0800148 If no free slot is immediately available, raise QueueFull.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700149 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700150 if self.full():
Guido van Rossumfef70982014-01-25 17:24:51 -0800151 raise QueueFull
Guido van Rossum99f96c52015-09-28 07:42:34 -0700152 self._put(item)
153 self._unfinished_tasks += 1
154 self._finished.clear()
155 self._wakeup_next(self._getters)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700156
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200157 async def get(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700158 """Remove and return an item from the queue.
159
Victor Stinner952ec982014-12-22 22:09:50 +0100160 If queue is empty, wait until an item is available.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700161 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700162 while self.empty():
Yury Selivanov7661db62016-05-16 15:38:39 -0400163 getter = self._loop.create_future()
Guido van Rossum99f96c52015-09-28 07:42:34 -0700164 self._getters.append(getter)
Yury Selivanov3fc0f2d2015-08-05 13:52:33 -0400165 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200166 await getter
Guido van Rossum99f96c52015-09-28 07:42:34 -0700167 except:
168 getter.cancel() # Just in case getter is not done yet.
Suren Nihalanic62f0cb2017-11-07 09:35:23 -0800169 try:
José Melero Fernándezc47dacb2018-01-25 23:45:43 +0000170 # Clean self._getters from canceled getters.
Suren Nihalanic62f0cb2017-11-07 09:35:23 -0800171 self._getters.remove(getter)
172 except ValueError:
José Melero Fernándezc47dacb2018-01-25 23:45:43 +0000173 # The getter could be removed from self._getters by a
174 # previous put_nowait call.
Suren Nihalanic62f0cb2017-11-07 09:35:23 -0800175 pass
Guido van Rossum99f96c52015-09-28 07:42:34 -0700176 if not self.empty() and not getter.cancelled():
177 # We were woken up by put_nowait(), but can't take
178 # the call. Wake up the next in line.
179 self._wakeup_next(self._getters)
Yury Selivanov3fc0f2d2015-08-05 13:52:33 -0400180 raise
Guido van Rossum99f96c52015-09-28 07:42:34 -0700181 return self.get_nowait()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700182
183 def get_nowait(self):
184 """Remove and return an item from the queue.
185
Guido van Rossumfef70982014-01-25 17:24:51 -0800186 Return an item if one is immediately available, else raise QueueEmpty.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700187 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700188 if self.empty():
Guido van Rossumfef70982014-01-25 17:24:51 -0800189 raise QueueEmpty
Guido van Rossum99f96c52015-09-28 07:42:34 -0700190 item = self._get()
191 self._wakeup_next(self._putters)
192 return item
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700193
Victor Stinner4cb814c2015-02-17 22:53:28 +0100194 def task_done(self):
195 """Indicate that a formerly enqueued task is complete.
196
197 Used by queue consumers. For each get() used to fetch a task,
198 a subsequent call to task_done() tells the queue that the processing
199 on the task is complete.
200
201 If a join() is currently blocking, it will resume when all items have
202 been processed (meaning that a task_done() call was received for every
203 item that had been put() into the queue).
204
205 Raises ValueError if called more times than there were items placed in
206 the queue.
207 """
208 if self._unfinished_tasks <= 0:
209 raise ValueError('task_done() called too many times')
210 self._unfinished_tasks -= 1
211 if self._unfinished_tasks == 0:
212 self._finished.set()
213
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200214 async def join(self):
Victor Stinner4cb814c2015-02-17 22:53:28 +0100215 """Block until all items in the queue have been gotten and processed.
216
217 The count of unfinished tasks goes up whenever an item is added to the
218 queue. The count goes down whenever a consumer calls task_done() to
219 indicate that the item was retrieved and all work on it is complete.
220 When the count of unfinished tasks drops to zero, join() unblocks.
221 """
222 if self._unfinished_tasks > 0:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200223 await self._finished.wait()
Victor Stinner4cb814c2015-02-17 22:53:28 +0100224
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700225
226class PriorityQueue(Queue):
227 """A subclass of Queue; retrieves entries in priority order (lowest first).
228
229 Entries are typically tuples of the form: (priority number, data).
230 """
231
232 def _init(self, maxsize):
233 self._queue = []
234
235 def _put(self, item, heappush=heapq.heappush):
236 heappush(self._queue, item)
237
238 def _get(self, heappop=heapq.heappop):
239 return heappop(self._queue)
240
241
242class LifoQueue(Queue):
243 """A subclass of Queue that retrieves most recently added entries first."""
244
245 def _init(self, maxsize):
246 self._queue = []
247
248 def _put(self, item):
249 self._queue.append(item)
250
251 def _get(self):
252 return self._queue.pop()