blob: 78ae9e99ccf0e9b6a1010c2c26f7f1d7ea633ca3 [file] [log] [blame]
Yury Selivanov6370f342017-12-10 18:36:12 -05001__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
3import collections
4import heapq
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006from . import locks
Yurii Karabas0ec34ca2020-11-24 20:08:54 +02007from . import mixins
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008
9
Guido van Rossumfef70982014-01-25 17:24:51 -080010class QueueEmpty(Exception):
Yury Selivanov6370f342017-12-10 18:36:12 -050011 """Raised when Queue.get_nowait() is called on an empty Queue."""
Guido van Rossumfef70982014-01-25 17:24:51 -080012 pass
13
14
15class QueueFull(Exception):
Yury Selivanov6370f342017-12-10 18:36:12 -050016 """Raised when the Queue.put_nowait() method is called on a full Queue."""
Guido van Rossumfef70982014-01-25 17:24:51 -080017 pass
18
19
Yurii Karabas0ec34ca2020-11-24 20:08:54 +020020class Queue(mixins._LoopBoundedMixin):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021 """A queue, useful for coordinating producer and consumer coroutines.
22
23 If maxsize is less than or equal to zero, the queue size is infinite. If it
Andrew Svetlov5f841b52017-12-09 00:23:48 +020024 is an integer greater than 0, then "await put()" will block when the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025 queue reaches maxsize, until an item is removed by get().
26
27 Unlike the standard library Queue, you can reliably know this Queue's size
Victor Stinner2748bc72013-12-13 10:57:04 +010028 with qsize(), since your single-threaded asyncio application won't be
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029 interrupted between calling qsize() and doing an operation on the Queue.
30 """
31
Yurii Karabas0ec34ca2020-11-24 20:08:54 +020032 def __init__(self, maxsize=0):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033 self._maxsize = maxsize
34
35 # Futures.
36 self._getters = collections.deque()
Guido van Rossum99f96c52015-09-28 07:42:34 -070037 # Futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038 self._putters = collections.deque()
Victor Stinner4cb814c2015-02-17 22:53:28 +010039 self._unfinished_tasks = 0
Yurii Karabas0ec34ca2020-11-24 20:08:54 +020040 self._finished = locks.Event()
Victor Stinner4cb814c2015-02-17 22:53:28 +010041 self._finished.set()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070042 self._init(maxsize)
43
Guido van Rossum0bd16bc2015-04-20 09:24:24 -070044 # These three are overridable in subclasses.
45
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046 def _init(self, maxsize):
47 self._queue = collections.deque()
48
49 def _get(self):
50 return self._queue.popleft()
51
52 def _put(self, item):
53 self._queue.append(item)
Guido van Rossum0bd16bc2015-04-20 09:24:24 -070054
55 # End of the overridable methods.
56
Guido van Rossum99f96c52015-09-28 07:42:34 -070057 def _wakeup_next(self, waiters):
58 # Wake up the next waiter (if any) that isn't cancelled.
59 while waiters:
60 waiter = waiters.popleft()
61 if not waiter.done():
62 waiter.set_result(None)
63 break
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070064
65 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -050066 return f'<{type(self).__name__} at {id(self):#x} {self._format()}>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070067
68 def __str__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -050069 return f'<{type(self).__name__} {self._format()}>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070070
Batuhan Taşkayadec36722019-12-07 14:05:07 +030071 def __class_getitem__(cls, type):
72 return cls
73
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 def _format(self):
Yury Selivanov6370f342017-12-10 18:36:12 -050075 result = f'maxsize={self._maxsize!r}'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070076 if getattr(self, '_queue', None):
Yury Selivanov6370f342017-12-10 18:36:12 -050077 result += f' _queue={list(self._queue)!r}'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070078 if self._getters:
Yury Selivanov6370f342017-12-10 18:36:12 -050079 result += f' _getters[{len(self._getters)}]'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070080 if self._putters:
Yury Selivanov6370f342017-12-10 18:36:12 -050081 result += f' _putters[{len(self._putters)}]'
Victor Stinner4cb814c2015-02-17 22:53:28 +010082 if self._unfinished_tasks:
Yury Selivanov6370f342017-12-10 18:36:12 -050083 result += f' tasks={self._unfinished_tasks}'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070084 return result
85
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086 def qsize(self):
87 """Number of items in the queue."""
88 return len(self._queue)
89
90 @property
91 def maxsize(self):
92 """Number of items allowed in the queue."""
93 return self._maxsize
94
95 def empty(self):
96 """Return True if the queue is empty, False otherwise."""
97 return not self._queue
98
99 def full(self):
100 """Return True if there are maxsize items in the queue.
101
102 Note: if the Queue was initialized with maxsize=0 (the default),
103 then full() is never True.
104 """
105 if self._maxsize <= 0:
106 return False
107 else:
Victor Stinner66dc6b02014-06-17 23:36:21 +0200108 return self.qsize() >= self._maxsize
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700109
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200110 async def put(self, item):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700111 """Put an item into the queue.
112
Victor Stinner952ec982014-12-22 22:09:50 +0100113 Put an item into the queue. If the queue is full, wait until a free
114 slot is available before adding item.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700115 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700116 while self.full():
Yurii Karabas0ec34ca2020-11-24 20:08:54 +0200117 putter = self._get_loop().create_future()
Guido van Rossum99f96c52015-09-28 07:42:34 -0700118 self._putters.append(putter)
119 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200120 await putter
Guido van Rossum99f96c52015-09-28 07:42:34 -0700121 except:
122 putter.cancel() # Just in case putter is not done yet.
José Melero Fernándezc47dacb2018-01-25 23:45:43 +0000123 try:
124 # Clean self._putters from canceled putters.
125 self._putters.remove(putter)
126 except ValueError:
127 # The putter could be removed from self._putters by a
128 # previous get_nowait call.
129 pass
Guido van Rossum99f96c52015-09-28 07:42:34 -0700130 if not self.full() and not putter.cancelled():
131 # We were woken up by get_nowait(), but can't take
132 # the call. Wake up the next in line.
133 self._wakeup_next(self._putters)
134 raise
135 return self.put_nowait(item)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700136
137 def put_nowait(self, item):
138 """Put an item into the queue without blocking.
139
Guido van Rossumfef70982014-01-25 17:24:51 -0800140 If no free slot is immediately available, raise QueueFull.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700141 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700142 if self.full():
Guido van Rossumfef70982014-01-25 17:24:51 -0800143 raise QueueFull
Guido van Rossum99f96c52015-09-28 07:42:34 -0700144 self._put(item)
145 self._unfinished_tasks += 1
146 self._finished.clear()
147 self._wakeup_next(self._getters)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700148
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200149 async def get(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700150 """Remove and return an item from the queue.
151
Victor Stinner952ec982014-12-22 22:09:50 +0100152 If queue is empty, wait until an item is available.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700153 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700154 while self.empty():
Yurii Karabas0ec34ca2020-11-24 20:08:54 +0200155 getter = self._get_loop().create_future()
Guido van Rossum99f96c52015-09-28 07:42:34 -0700156 self._getters.append(getter)
Yury Selivanov3fc0f2d2015-08-05 13:52:33 -0400157 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200158 await getter
Guido van Rossum99f96c52015-09-28 07:42:34 -0700159 except:
160 getter.cancel() # Just in case getter is not done yet.
Suren Nihalanic62f0cb2017-11-07 09:35:23 -0800161 try:
José Melero Fernándezc47dacb2018-01-25 23:45:43 +0000162 # Clean self._getters from canceled getters.
Suren Nihalanic62f0cb2017-11-07 09:35:23 -0800163 self._getters.remove(getter)
164 except ValueError:
José Melero Fernándezc47dacb2018-01-25 23:45:43 +0000165 # The getter could be removed from self._getters by a
166 # previous put_nowait call.
Suren Nihalanic62f0cb2017-11-07 09:35:23 -0800167 pass
Guido van Rossum99f96c52015-09-28 07:42:34 -0700168 if not self.empty() and not getter.cancelled():
169 # We were woken up by put_nowait(), but can't take
170 # the call. Wake up the next in line.
171 self._wakeup_next(self._getters)
Yury Selivanov3fc0f2d2015-08-05 13:52:33 -0400172 raise
Guido van Rossum99f96c52015-09-28 07:42:34 -0700173 return self.get_nowait()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700174
175 def get_nowait(self):
176 """Remove and return an item from the queue.
177
Guido van Rossumfef70982014-01-25 17:24:51 -0800178 Return an item if one is immediately available, else raise QueueEmpty.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700179 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700180 if self.empty():
Guido van Rossumfef70982014-01-25 17:24:51 -0800181 raise QueueEmpty
Guido van Rossum99f96c52015-09-28 07:42:34 -0700182 item = self._get()
183 self._wakeup_next(self._putters)
184 return item
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700185
Victor Stinner4cb814c2015-02-17 22:53:28 +0100186 def task_done(self):
187 """Indicate that a formerly enqueued task is complete.
188
189 Used by queue consumers. For each get() used to fetch a task,
190 a subsequent call to task_done() tells the queue that the processing
191 on the task is complete.
192
193 If a join() is currently blocking, it will resume when all items have
194 been processed (meaning that a task_done() call was received for every
195 item that had been put() into the queue).
196
197 Raises ValueError if called more times than there were items placed in
198 the queue.
199 """
200 if self._unfinished_tasks <= 0:
201 raise ValueError('task_done() called too many times')
202 self._unfinished_tasks -= 1
203 if self._unfinished_tasks == 0:
204 self._finished.set()
205
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200206 async def join(self):
Victor Stinner4cb814c2015-02-17 22:53:28 +0100207 """Block until all items in the queue have been gotten and processed.
208
209 The count of unfinished tasks goes up whenever an item is added to the
210 queue. The count goes down whenever a consumer calls task_done() to
211 indicate that the item was retrieved and all work on it is complete.
212 When the count of unfinished tasks drops to zero, join() unblocks.
213 """
214 if self._unfinished_tasks > 0:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200215 await self._finished.wait()
Victor Stinner4cb814c2015-02-17 22:53:28 +0100216
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217
218class PriorityQueue(Queue):
219 """A subclass of Queue; retrieves entries in priority order (lowest first).
220
221 Entries are typically tuples of the form: (priority number, data).
222 """
223
224 def _init(self, maxsize):
225 self._queue = []
226
227 def _put(self, item, heappush=heapq.heappush):
228 heappush(self._queue, item)
229
230 def _get(self, heappop=heapq.heappop):
231 return heappop(self._queue)
232
233
234class LifoQueue(Queue):
235 """A subclass of Queue that retrieves most recently added entries first."""
236
237 def _init(self, maxsize):
238 self._queue = []
239
240 def _put(self, item):
241 self._queue.append(item)
242
243 def _get(self):
244 return self._queue.pop()