blob: 512ea6037f8abb1d47fcc52453fbdca2fe9ed02c [file] [log] [blame]
Yury Selivanov6370f342017-12-10 18:36:12 -05001__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
3import collections
4import heapq
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005
6from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007from . import locks
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008
9
Guido van Rossumfef70982014-01-25 17:24:51 -080010class QueueEmpty(Exception):
Yury Selivanov6370f342017-12-10 18:36:12 -050011 """Raised when Queue.get_nowait() is called on an empty Queue."""
Guido van Rossumfef70982014-01-25 17:24:51 -080012 pass
13
14
15class QueueFull(Exception):
Yury Selivanov6370f342017-12-10 18:36:12 -050016 """Raised when the Queue.put_nowait() method is called on a full Queue."""
Guido van Rossumfef70982014-01-25 17:24:51 -080017 pass
18
19
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020class Queue:
21 """A queue, useful for coordinating producer and consumer coroutines.
22
23 If maxsize is less than or equal to zero, the queue size is infinite. If it
Andrew Svetlov5f841b52017-12-09 00:23:48 +020024 is an integer greater than 0, then "await put()" will block when the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025 queue reaches maxsize, until an item is removed by get().
26
27 Unlike the standard library Queue, you can reliably know this Queue's size
Victor Stinner2748bc72013-12-13 10:57:04 +010028 with qsize(), since your single-threaded asyncio application won't be
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029 interrupted between calling qsize() and doing an operation on the Queue.
30 """
31
32 def __init__(self, maxsize=0, *, loop=None):
33 if loop is None:
34 self._loop = events.get_event_loop()
35 else:
36 self._loop = loop
37 self._maxsize = maxsize
38
39 # Futures.
40 self._getters = collections.deque()
Guido van Rossum99f96c52015-09-28 07:42:34 -070041 # Futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070042 self._putters = collections.deque()
Victor Stinner4cb814c2015-02-17 22:53:28 +010043 self._unfinished_tasks = 0
44 self._finished = locks.Event(loop=self._loop)
45 self._finished.set()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046 self._init(maxsize)
47
Guido van Rossum0bd16bc2015-04-20 09:24:24 -070048 # These three are overridable in subclasses.
49
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050 def _init(self, maxsize):
51 self._queue = collections.deque()
52
53 def _get(self):
54 return self._queue.popleft()
55
56 def _put(self, item):
57 self._queue.append(item)
Guido van Rossum0bd16bc2015-04-20 09:24:24 -070058
59 # End of the overridable methods.
60
Guido van Rossum99f96c52015-09-28 07:42:34 -070061 def _wakeup_next(self, waiters):
62 # Wake up the next waiter (if any) that isn't cancelled.
63 while waiters:
64 waiter = waiters.popleft()
65 if not waiter.done():
66 waiter.set_result(None)
67 break
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070068
69 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -050070 return f'<{type(self).__name__} at {id(self):#x} {self._format()}>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070071
72 def __str__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -050073 return f'<{type(self).__name__} {self._format()}>'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074
75 def _format(self):
Yury Selivanov6370f342017-12-10 18:36:12 -050076 result = f'maxsize={self._maxsize!r}'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070077 if getattr(self, '_queue', None):
Yury Selivanov6370f342017-12-10 18:36:12 -050078 result += f' _queue={list(self._queue)!r}'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079 if self._getters:
Yury Selivanov6370f342017-12-10 18:36:12 -050080 result += f' _getters[{len(self._getters)}]'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070081 if self._putters:
Yury Selivanov6370f342017-12-10 18:36:12 -050082 result += f' _putters[{len(self._putters)}]'
Victor Stinner4cb814c2015-02-17 22:53:28 +010083 if self._unfinished_tasks:
Yury Selivanov6370f342017-12-10 18:36:12 -050084 result += f' tasks={self._unfinished_tasks}'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070085 return result
86
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070087 def qsize(self):
88 """Number of items in the queue."""
89 return len(self._queue)
90
91 @property
92 def maxsize(self):
93 """Number of items allowed in the queue."""
94 return self._maxsize
95
96 def empty(self):
97 """Return True if the queue is empty, False otherwise."""
98 return not self._queue
99
100 def full(self):
101 """Return True if there are maxsize items in the queue.
102
103 Note: if the Queue was initialized with maxsize=0 (the default),
104 then full() is never True.
105 """
106 if self._maxsize <= 0:
107 return False
108 else:
Victor Stinner66dc6b02014-06-17 23:36:21 +0200109 return self.qsize() >= self._maxsize
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700110
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200111 async def put(self, item):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700112 """Put an item into the queue.
113
Victor Stinner952ec982014-12-22 22:09:50 +0100114 Put an item into the queue. If the queue is full, wait until a free
115 slot is available before adding item.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700116 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700117 while self.full():
Yury Selivanov7661db62016-05-16 15:38:39 -0400118 putter = self._loop.create_future()
Guido van Rossum99f96c52015-09-28 07:42:34 -0700119 self._putters.append(putter)
120 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200121 await putter
Guido van Rossum99f96c52015-09-28 07:42:34 -0700122 except:
123 putter.cancel() # Just in case putter is not done yet.
124 if not self.full() and not putter.cancelled():
125 # We were woken up by get_nowait(), but can't take
126 # the call. Wake up the next in line.
127 self._wakeup_next(self._putters)
128 raise
129 return self.put_nowait(item)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700130
131 def put_nowait(self, item):
132 """Put an item into the queue without blocking.
133
Guido van Rossumfef70982014-01-25 17:24:51 -0800134 If no free slot is immediately available, raise QueueFull.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700135 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700136 if self.full():
Guido van Rossumfef70982014-01-25 17:24:51 -0800137 raise QueueFull
Guido van Rossum99f96c52015-09-28 07:42:34 -0700138 self._put(item)
139 self._unfinished_tasks += 1
140 self._finished.clear()
141 self._wakeup_next(self._getters)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700142
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200143 async def get(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700144 """Remove and return an item from the queue.
145
Victor Stinner952ec982014-12-22 22:09:50 +0100146 If queue is empty, wait until an item is available.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700147 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700148 while self.empty():
Yury Selivanov7661db62016-05-16 15:38:39 -0400149 getter = self._loop.create_future()
Guido van Rossum99f96c52015-09-28 07:42:34 -0700150 self._getters.append(getter)
Yury Selivanov3fc0f2d2015-08-05 13:52:33 -0400151 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200152 await getter
Guido van Rossum99f96c52015-09-28 07:42:34 -0700153 except:
154 getter.cancel() # Just in case getter is not done yet.
Suren Nihalanic62f0cb2017-11-07 09:35:23 -0800155
156 try:
157 self._getters.remove(getter)
158 except ValueError:
159 pass
160
Guido van Rossum99f96c52015-09-28 07:42:34 -0700161 if not self.empty() and not getter.cancelled():
162 # We were woken up by put_nowait(), but can't take
163 # the call. Wake up the next in line.
164 self._wakeup_next(self._getters)
Yury Selivanov3fc0f2d2015-08-05 13:52:33 -0400165 raise
Guido van Rossum99f96c52015-09-28 07:42:34 -0700166 return self.get_nowait()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167
168 def get_nowait(self):
169 """Remove and return an item from the queue.
170
Guido van Rossumfef70982014-01-25 17:24:51 -0800171 Return an item if one is immediately available, else raise QueueEmpty.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700172 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700173 if self.empty():
Guido van Rossumfef70982014-01-25 17:24:51 -0800174 raise QueueEmpty
Guido van Rossum99f96c52015-09-28 07:42:34 -0700175 item = self._get()
176 self._wakeup_next(self._putters)
177 return item
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700178
Victor Stinner4cb814c2015-02-17 22:53:28 +0100179 def task_done(self):
180 """Indicate that a formerly enqueued task is complete.
181
182 Used by queue consumers. For each get() used to fetch a task,
183 a subsequent call to task_done() tells the queue that the processing
184 on the task is complete.
185
186 If a join() is currently blocking, it will resume when all items have
187 been processed (meaning that a task_done() call was received for every
188 item that had been put() into the queue).
189
190 Raises ValueError if called more times than there were items placed in
191 the queue.
192 """
193 if self._unfinished_tasks <= 0:
194 raise ValueError('task_done() called too many times')
195 self._unfinished_tasks -= 1
196 if self._unfinished_tasks == 0:
197 self._finished.set()
198
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200199 async def join(self):
Victor Stinner4cb814c2015-02-17 22:53:28 +0100200 """Block until all items in the queue have been gotten and processed.
201
202 The count of unfinished tasks goes up whenever an item is added to the
203 queue. The count goes down whenever a consumer calls task_done() to
204 indicate that the item was retrieved and all work on it is complete.
205 When the count of unfinished tasks drops to zero, join() unblocks.
206 """
207 if self._unfinished_tasks > 0:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200208 await self._finished.wait()
Victor Stinner4cb814c2015-02-17 22:53:28 +0100209
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700210
211class PriorityQueue(Queue):
212 """A subclass of Queue; retrieves entries in priority order (lowest first).
213
214 Entries are typically tuples of the form: (priority number, data).
215 """
216
217 def _init(self, maxsize):
218 self._queue = []
219
220 def _put(self, item, heappush=heapq.heappush):
221 heappush(self._queue, item)
222
223 def _get(self, heappop=heapq.heappop):
224 return heappop(self._queue)
225
226
227class LifoQueue(Queue):
228 """A subclass of Queue that retrieves most recently added entries first."""
229
230 def _init(self, maxsize):
231 self._queue = []
232
233 def _put(self, item):
234 self._queue.append(item)
235
236 def _get(self):
237 return self._queue.pop()