blob: 2a8d3e76044109c4761bc85f31bcee2aef1a2bbb [file] [log] [blame]
Yury Selivanov6370f342017-12-10 18:36:12 -05001__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
3import collections
4import heapq
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005
6from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007from . import locks
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008
9
Guido van Rossumfef70982014-01-25 17:24:51 -080010class QueueEmpty(Exception):
Yury Selivanov6370f342017-12-10 18:36:12 -050011 """Raised when Queue.get_nowait() is called on an empty Queue."""
Guido van Rossumfef70982014-01-25 17:24:51 -080012 pass
13
14
15class QueueFull(Exception):
Yury Selivanov6370f342017-12-10 18:36:12 -050016 """Raised when the Queue.put_nowait() method is called on a full Queue."""
Guido van Rossumfef70982014-01-25 17:24:51 -080017 pass
18
19
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020class Queue:
21 """A queue, useful for coordinating producer and consumer coroutines.
22
23 If maxsize is less than or equal to zero, the queue size is infinite. If it
Andrew Svetlov5f841b52017-12-09 00:23:48 +020024 is an integer greater than 0, then "await put()" will block when the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025 queue reaches maxsize, until an item is removed by get().
26
27 Unlike the standard library Queue, you can reliably know this Queue's size
Victor Stinner2748bc72013-12-13 10:57:04 +010028 with qsize(), since your single-threaded asyncio application won't be
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029 interrupted between calling qsize() and doing an operation on the Queue.
30 """
31
32 def __init__(self, maxsize=0, *, loop=None):
33 if loop is None:
34 self._loop = events.get_event_loop()
35 else:
36 self._loop = loop
37 self._maxsize = maxsize
38
39 # Futures.
40 self._getters = collections.deque()
Guido van Rossum99f96c52015-09-28 07:42:34 -070041 # Futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070042 self._putters = collections.deque()
Victor Stinner4cb814c2015-02-17 22:53:28 +010043 self._unfinished_tasks = 0
44 self._finished = locks.Event(loop=self._loop)
45 self._finished.set()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046 self._init(maxsize)
47
Guido van Rossum0bd16bc2015-04-20 09:24:24 -070048 # These three are overridable in subclasses.
49
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050 def _init(self, maxsize):
51 self._queue = collections.deque()
52
53 def _get(self):
54 return self._queue.popleft()
55
56 def _put(self, item):
57 self._queue.append(item)
Guido van Rossum0bd16bc2015-04-20 09:24:24 -070058
59 # End of the overridable methods.
60
Guido van Rossum99f96c52015-09-28 07:42:34 -070061 def _wakeup_next(self, waiters):
62 # Wake up the next waiter (if any) that isn't cancelled.
63 while waiters:
64 waiter = waiters.popleft()
65 if not waiter.done():
66 waiter.set_result(None)
67 break
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070068
69 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -050070 return f'<{type(self).__name__} at {id(self):#x} {self._format()}>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070071
72 def __str__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -050073 return f'<{type(self).__name__} {self._format()}>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074
75 def _format(self):
Yury Selivanov6370f342017-12-10 18:36:12 -050076 result = f'maxsize={self._maxsize!r}'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070077 if getattr(self, '_queue', None):
Yury Selivanov6370f342017-12-10 18:36:12 -050078 result += f' _queue={list(self._queue)!r}'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079 if self._getters:
Yury Selivanov6370f342017-12-10 18:36:12 -050080 result += f' _getters[{len(self._getters)}]'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070081 if self._putters:
Yury Selivanov6370f342017-12-10 18:36:12 -050082 result += f' _putters[{len(self._putters)}]'
Victor Stinner4cb814c2015-02-17 22:53:28 +010083 if self._unfinished_tasks:
Yury Selivanov6370f342017-12-10 18:36:12 -050084 result += f' tasks={self._unfinished_tasks}'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070085 return result
86
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070087 def qsize(self):
88 """Number of items in the queue."""
89 return len(self._queue)
90
91 @property
92 def maxsize(self):
93 """Number of items allowed in the queue."""
94 return self._maxsize
95
96 def empty(self):
97 """Return True if the queue is empty, False otherwise."""
98 return not self._queue
99
100 def full(self):
101 """Return True if there are maxsize items in the queue.
102
103 Note: if the Queue was initialized with maxsize=0 (the default),
104 then full() is never True.
105 """
106 if self._maxsize <= 0:
107 return False
108 else:
Victor Stinner66dc6b02014-06-17 23:36:21 +0200109 return self.qsize() >= self._maxsize
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700110
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200111 async def put(self, item):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700112 """Put an item into the queue.
113
Victor Stinner952ec982014-12-22 22:09:50 +0100114 Put an item into the queue. If the queue is full, wait until a free
115 slot is available before adding item.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700116 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700117 while self.full():
Yury Selivanov7661db62016-05-16 15:38:39 -0400118 putter = self._loop.create_future()
Guido van Rossum99f96c52015-09-28 07:42:34 -0700119 self._putters.append(putter)
120 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200121 await putter
Guido van Rossum99f96c52015-09-28 07:42:34 -0700122 except:
123 putter.cancel() # Just in case putter is not done yet.
José Melero Fernándezc47dacb2018-01-25 23:45:43 +0000124 try:
125 # Clean self._putters from canceled putters.
126 self._putters.remove(putter)
127 except ValueError:
128 # The putter could be removed from self._putters by a
129 # previous get_nowait call.
130 pass
Guido van Rossum99f96c52015-09-28 07:42:34 -0700131 if not self.full() and not putter.cancelled():
132 # We were woken up by get_nowait(), but can't take
133 # the call. Wake up the next in line.
134 self._wakeup_next(self._putters)
135 raise
136 return self.put_nowait(item)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700137
138 def put_nowait(self, item):
139 """Put an item into the queue without blocking.
140
Guido van Rossumfef70982014-01-25 17:24:51 -0800141 If no free slot is immediately available, raise QueueFull.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700142 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700143 if self.full():
Guido van Rossumfef70982014-01-25 17:24:51 -0800144 raise QueueFull
Guido van Rossum99f96c52015-09-28 07:42:34 -0700145 self._put(item)
146 self._unfinished_tasks += 1
147 self._finished.clear()
148 self._wakeup_next(self._getters)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700149
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200150 async def get(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700151 """Remove and return an item from the queue.
152
Victor Stinner952ec982014-12-22 22:09:50 +0100153 If queue is empty, wait until an item is available.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700154 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700155 while self.empty():
Yury Selivanov7661db62016-05-16 15:38:39 -0400156 getter = self._loop.create_future()
Guido van Rossum99f96c52015-09-28 07:42:34 -0700157 self._getters.append(getter)
Yury Selivanov3fc0f2d2015-08-05 13:52:33 -0400158 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200159 await getter
Guido van Rossum99f96c52015-09-28 07:42:34 -0700160 except:
161 getter.cancel() # Just in case getter is not done yet.
Suren Nihalanic62f0cb2017-11-07 09:35:23 -0800162 try:
José Melero Fernándezc47dacb2018-01-25 23:45:43 +0000163 # Clean self._getters from canceled getters.
Suren Nihalanic62f0cb2017-11-07 09:35:23 -0800164 self._getters.remove(getter)
165 except ValueError:
José Melero Fernándezc47dacb2018-01-25 23:45:43 +0000166 # The getter could be removed from self._getters by a
167 # previous put_nowait call.
Suren Nihalanic62f0cb2017-11-07 09:35:23 -0800168 pass
Guido van Rossum99f96c52015-09-28 07:42:34 -0700169 if not self.empty() and not getter.cancelled():
170 # We were woken up by put_nowait(), but can't take
171 # the call. Wake up the next in line.
172 self._wakeup_next(self._getters)
Yury Selivanov3fc0f2d2015-08-05 13:52:33 -0400173 raise
Guido van Rossum99f96c52015-09-28 07:42:34 -0700174 return self.get_nowait()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700175
176 def get_nowait(self):
177 """Remove and return an item from the queue.
178
Guido van Rossumfef70982014-01-25 17:24:51 -0800179 Return an item if one is immediately available, else raise QueueEmpty.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700180 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700181 if self.empty():
Guido van Rossumfef70982014-01-25 17:24:51 -0800182 raise QueueEmpty
Guido van Rossum99f96c52015-09-28 07:42:34 -0700183 item = self._get()
184 self._wakeup_next(self._putters)
185 return item
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700186
Victor Stinner4cb814c2015-02-17 22:53:28 +0100187 def task_done(self):
188 """Indicate that a formerly enqueued task is complete.
189
190 Used by queue consumers. For each get() used to fetch a task,
191 a subsequent call to task_done() tells the queue that the processing
192 on the task is complete.
193
194 If a join() is currently blocking, it will resume when all items have
195 been processed (meaning that a task_done() call was received for every
196 item that had been put() into the queue).
197
198 Raises ValueError if called more times than there were items placed in
199 the queue.
200 """
201 if self._unfinished_tasks <= 0:
202 raise ValueError('task_done() called too many times')
203 self._unfinished_tasks -= 1
204 if self._unfinished_tasks == 0:
205 self._finished.set()
206
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200207 async def join(self):
Victor Stinner4cb814c2015-02-17 22:53:28 +0100208 """Block until all items in the queue have been gotten and processed.
209
210 The count of unfinished tasks goes up whenever an item is added to the
211 queue. The count goes down whenever a consumer calls task_done() to
212 indicate that the item was retrieved and all work on it is complete.
213 When the count of unfinished tasks drops to zero, join() unblocks.
214 """
215 if self._unfinished_tasks > 0:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200216 await self._finished.wait()
Victor Stinner4cb814c2015-02-17 22:53:28 +0100217
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700218
219class PriorityQueue(Queue):
220 """A subclass of Queue; retrieves entries in priority order (lowest first).
221
222 Entries are typically tuples of the form: (priority number, data).
223 """
224
225 def _init(self, maxsize):
226 self._queue = []
227
228 def _put(self, item, heappush=heapq.heappush):
229 heappush(self._queue, item)
230
231 def _get(self, heappop=heapq.heappop):
232 return heappop(self._queue)
233
234
235class LifoQueue(Queue):
236 """A subclass of Queue that retrieves most recently added entries first."""
237
238 def _init(self, maxsize):
239 self._queue = []
240
241 def _put(self, item):
242 self._queue.append(item)
243
244 def _get(self):
245 return self._queue.pop()