blob: 4fc681dde9753227d2b0614cbf87a3230048c605 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Queues"""
2
Victor Stinnereaf16ab2015-07-25 02:40:40 +02003__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
5import collections
6import heapq
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007
8from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009from . import locks
Victor Stinnere6ecea52015-07-09 23:13:50 +020010from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
12
Guido van Rossumfef70982014-01-25 17:24:51 -080013class QueueEmpty(Exception):
Victor Stinner17d87f82015-02-03 15:09:24 +010014 """Exception raised when Queue.get_nowait() is called on a Queue object
15 which is empty.
16 """
Guido van Rossumfef70982014-01-25 17:24:51 -080017 pass
18
19
20class QueueFull(Exception):
Victor Stinner17d87f82015-02-03 15:09:24 +010021 """Exception raised when the Queue.put_nowait() method is called on a Queue
22 object which is full.
23 """
Guido van Rossumfef70982014-01-25 17:24:51 -080024 pass
25
26
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027class Queue:
28 """A queue, useful for coordinating producer and consumer coroutines.
29
30 If maxsize is less than or equal to zero, the queue size is infinite. If it
31 is an integer greater than 0, then "yield from put()" will block when the
32 queue reaches maxsize, until an item is removed by get().
33
34 Unlike the standard library Queue, you can reliably know this Queue's size
Victor Stinner2748bc72013-12-13 10:57:04 +010035 with qsize(), since your single-threaded asyncio application won't be
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070036 interrupted between calling qsize() and doing an operation on the Queue.
37 """
38
39 def __init__(self, maxsize=0, *, loop=None):
40 if loop is None:
41 self._loop = events.get_event_loop()
42 else:
43 self._loop = loop
44 self._maxsize = maxsize
45
46 # Futures.
47 self._getters = collections.deque()
Guido van Rossum99f96c52015-09-28 07:42:34 -070048 # Futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070049 self._putters = collections.deque()
Victor Stinner4cb814c2015-02-17 22:53:28 +010050 self._unfinished_tasks = 0
51 self._finished = locks.Event(loop=self._loop)
52 self._finished.set()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070053 self._init(maxsize)
54
Guido van Rossum0bd16bc2015-04-20 09:24:24 -070055 # These three are overridable in subclasses.
56
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070057 def _init(self, maxsize):
58 self._queue = collections.deque()
59
60 def _get(self):
61 return self._queue.popleft()
62
63 def _put(self, item):
64 self._queue.append(item)
Guido van Rossum0bd16bc2015-04-20 09:24:24 -070065
66 # End of the overridable methods.
67
Guido van Rossum99f96c52015-09-28 07:42:34 -070068 def _wakeup_next(self, waiters):
69 # Wake up the next waiter (if any) that isn't cancelled.
70 while waiters:
71 waiter = waiters.popleft()
72 if not waiter.done():
73 waiter.set_result(None)
74 break
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070075
76 def __repr__(self):
77 return '<{} at {:#x} {}>'.format(
78 type(self).__name__, id(self), self._format())
79
80 def __str__(self):
81 return '<{} {}>'.format(type(self).__name__, self._format())
82
83 def _format(self):
84 result = 'maxsize={!r}'.format(self._maxsize)
85 if getattr(self, '_queue', None):
86 result += ' _queue={!r}'.format(list(self._queue))
87 if self._getters:
88 result += ' _getters[{}]'.format(len(self._getters))
89 if self._putters:
90 result += ' _putters[{}]'.format(len(self._putters))
Victor Stinner4cb814c2015-02-17 22:53:28 +010091 if self._unfinished_tasks:
92 result += ' tasks={}'.format(self._unfinished_tasks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070093 return result
94
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070095 def qsize(self):
96 """Number of items in the queue."""
97 return len(self._queue)
98
99 @property
100 def maxsize(self):
101 """Number of items allowed in the queue."""
102 return self._maxsize
103
104 def empty(self):
105 """Return True if the queue is empty, False otherwise."""
106 return not self._queue
107
108 def full(self):
109 """Return True if there are maxsize items in the queue.
110
111 Note: if the Queue was initialized with maxsize=0 (the default),
112 then full() is never True.
113 """
114 if self._maxsize <= 0:
115 return False
116 else:
Victor Stinner66dc6b02014-06-17 23:36:21 +0200117 return self.qsize() >= self._maxsize
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700118
119 @coroutine
120 def put(self, item):
121 """Put an item into the queue.
122
Victor Stinner952ec982014-12-22 22:09:50 +0100123 Put an item into the queue. If the queue is full, wait until a free
124 slot is available before adding item.
125
126 This method is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700127 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700128 while self.full():
Yury Selivanov7661db62016-05-16 15:38:39 -0400129 putter = self._loop.create_future()
Guido van Rossum99f96c52015-09-28 07:42:34 -0700130 self._putters.append(putter)
131 try:
132 yield from putter
133 except:
134 putter.cancel() # Just in case putter is not done yet.
135 if not self.full() and not putter.cancelled():
136 # We were woken up by get_nowait(), but can't take
137 # the call. Wake up the next in line.
138 self._wakeup_next(self._putters)
139 raise
140 return self.put_nowait(item)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700141
142 def put_nowait(self, item):
143 """Put an item into the queue without blocking.
144
Guido van Rossumfef70982014-01-25 17:24:51 -0800145 If no free slot is immediately available, raise QueueFull.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700146 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700147 if self.full():
Guido van Rossumfef70982014-01-25 17:24:51 -0800148 raise QueueFull
Guido van Rossum99f96c52015-09-28 07:42:34 -0700149 self._put(item)
150 self._unfinished_tasks += 1
151 self._finished.clear()
152 self._wakeup_next(self._getters)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700153
154 @coroutine
155 def get(self):
156 """Remove and return an item from the queue.
157
Victor Stinner952ec982014-12-22 22:09:50 +0100158 If queue is empty, wait until an item is available.
159
160 This method is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700161 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700162 while self.empty():
Yury Selivanov7661db62016-05-16 15:38:39 -0400163 getter = self._loop.create_future()
Guido van Rossum99f96c52015-09-28 07:42:34 -0700164 self._getters.append(getter)
Yury Selivanov3fc0f2d2015-08-05 13:52:33 -0400165 try:
Guido van Rossum99f96c52015-09-28 07:42:34 -0700166 yield from getter
167 except:
168 getter.cancel() # Just in case getter is not done yet.
Suren Nihalanic62f0cb2017-11-07 09:35:23 -0800169
170 try:
171 self._getters.remove(getter)
172 except ValueError:
173 pass
174
Guido van Rossum99f96c52015-09-28 07:42:34 -0700175 if not self.empty() and not getter.cancelled():
176 # We were woken up by put_nowait(), but can't take
177 # the call. Wake up the next in line.
178 self._wakeup_next(self._getters)
Yury Selivanov3fc0f2d2015-08-05 13:52:33 -0400179 raise
Guido van Rossum99f96c52015-09-28 07:42:34 -0700180 return self.get_nowait()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700181
182 def get_nowait(self):
183 """Remove and return an item from the queue.
184
Guido van Rossumfef70982014-01-25 17:24:51 -0800185 Return an item if one is immediately available, else raise QueueEmpty.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700186 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700187 if self.empty():
Guido van Rossumfef70982014-01-25 17:24:51 -0800188 raise QueueEmpty
Guido van Rossum99f96c52015-09-28 07:42:34 -0700189 item = self._get()
190 self._wakeup_next(self._putters)
191 return item
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700192
Victor Stinner4cb814c2015-02-17 22:53:28 +0100193 def task_done(self):
194 """Indicate that a formerly enqueued task is complete.
195
196 Used by queue consumers. For each get() used to fetch a task,
197 a subsequent call to task_done() tells the queue that the processing
198 on the task is complete.
199
200 If a join() is currently blocking, it will resume when all items have
201 been processed (meaning that a task_done() call was received for every
202 item that had been put() into the queue).
203
204 Raises ValueError if called more times than there were items placed in
205 the queue.
206 """
207 if self._unfinished_tasks <= 0:
208 raise ValueError('task_done() called too many times')
209 self._unfinished_tasks -= 1
210 if self._unfinished_tasks == 0:
211 self._finished.set()
212
213 @coroutine
214 def join(self):
215 """Block until all items in the queue have been gotten and processed.
216
217 The count of unfinished tasks goes up whenever an item is added to the
218 queue. The count goes down whenever a consumer calls task_done() to
219 indicate that the item was retrieved and all work on it is complete.
220 When the count of unfinished tasks drops to zero, join() unblocks.
221 """
222 if self._unfinished_tasks > 0:
223 yield from self._finished.wait()
224
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700225
226class PriorityQueue(Queue):
227 """A subclass of Queue; retrieves entries in priority order (lowest first).
228
229 Entries are typically tuples of the form: (priority number, data).
230 """
231
232 def _init(self, maxsize):
233 self._queue = []
234
235 def _put(self, item, heappush=heapq.heappush):
236 heappush(self._queue, item)
237
238 def _get(self, heappop=heapq.heappop):
239 return heappop(self._queue)
240
241
242class LifoQueue(Queue):
243 """A subclass of Queue that retrieves most recently added entries first."""
244
245 def _init(self, maxsize):
246 self._queue = []
247
248 def _put(self, item):
249 self._queue.append(item)
250
251 def _get(self):
252 return self._queue.pop()