blob: 1c66d67b041304fc7f569acab652b0b8587bd19c [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Queues"""
2
Victor Stinnereaf16ab2015-07-25 02:40:40 +02003__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
5import collections
6import heapq
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007
Victor Stinnereaf16ab2015-07-25 02:40:40 +02008from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010from . import locks
Victor Stinnere6ecea52015-07-09 23:13:50 +020011from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Guido van Rossumfef70982014-01-25 17:24:51 -080014class QueueEmpty(Exception):
Victor Stinner17d87f82015-02-03 15:09:24 +010015 """Exception raised when Queue.get_nowait() is called on a Queue object
16 which is empty.
17 """
Guido van Rossumfef70982014-01-25 17:24:51 -080018 pass
19
20
21class QueueFull(Exception):
Victor Stinner17d87f82015-02-03 15:09:24 +010022 """Exception raised when the Queue.put_nowait() method is called on a Queue
23 object which is full.
24 """
Guido van Rossumfef70982014-01-25 17:24:51 -080025 pass
26
27
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028class Queue:
29 """A queue, useful for coordinating producer and consumer coroutines.
30
31 If maxsize is less than or equal to zero, the queue size is infinite. If it
32 is an integer greater than 0, then "yield from put()" will block when the
33 queue reaches maxsize, until an item is removed by get().
34
35 Unlike the standard library Queue, you can reliably know this Queue's size
Victor Stinner2748bc72013-12-13 10:57:04 +010036 with qsize(), since your single-threaded asyncio application won't be
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070037 interrupted between calling qsize() and doing an operation on the Queue.
38 """
39
40 def __init__(self, maxsize=0, *, loop=None):
41 if loop is None:
42 self._loop = events.get_event_loop()
43 else:
44 self._loop = loop
45 self._maxsize = maxsize
46
47 # Futures.
48 self._getters = collections.deque()
Guido van Rossum99f96c52015-09-28 07:42:34 -070049 # Futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050 self._putters = collections.deque()
Victor Stinner4cb814c2015-02-17 22:53:28 +010051 self._unfinished_tasks = 0
52 self._finished = locks.Event(loop=self._loop)
53 self._finished.set()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070054 self._init(maxsize)
55
Guido van Rossum0bd16bc2015-04-20 09:24:24 -070056 # These three are overridable in subclasses.
57
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070058 def _init(self, maxsize):
59 self._queue = collections.deque()
60
61 def _get(self):
62 return self._queue.popleft()
63
64 def _put(self, item):
65 self._queue.append(item)
Guido van Rossum0bd16bc2015-04-20 09:24:24 -070066
67 # End of the overridable methods.
68
Guido van Rossum99f96c52015-09-28 07:42:34 -070069 def _wakeup_next(self, waiters):
70 # Wake up the next waiter (if any) that isn't cancelled.
71 while waiters:
72 waiter = waiters.popleft()
73 if not waiter.done():
74 waiter.set_result(None)
75 break
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070076
77 def __repr__(self):
78 return '<{} at {:#x} {}>'.format(
79 type(self).__name__, id(self), self._format())
80
81 def __str__(self):
82 return '<{} {}>'.format(type(self).__name__, self._format())
83
84 def _format(self):
85 result = 'maxsize={!r}'.format(self._maxsize)
86 if getattr(self, '_queue', None):
87 result += ' _queue={!r}'.format(list(self._queue))
88 if self._getters:
89 result += ' _getters[{}]'.format(len(self._getters))
90 if self._putters:
91 result += ' _putters[{}]'.format(len(self._putters))
Victor Stinner4cb814c2015-02-17 22:53:28 +010092 if self._unfinished_tasks:
93 result += ' tasks={}'.format(self._unfinished_tasks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070094 return result
95
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070096 def qsize(self):
97 """Number of items in the queue."""
98 return len(self._queue)
99
100 @property
101 def maxsize(self):
102 """Number of items allowed in the queue."""
103 return self._maxsize
104
105 def empty(self):
106 """Return True if the queue is empty, False otherwise."""
107 return not self._queue
108
109 def full(self):
110 """Return True if there are maxsize items in the queue.
111
112 Note: if the Queue was initialized with maxsize=0 (the default),
113 then full() is never True.
114 """
115 if self._maxsize <= 0:
116 return False
117 else:
Victor Stinner66dc6b02014-06-17 23:36:21 +0200118 return self.qsize() >= self._maxsize
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119
120 @coroutine
121 def put(self, item):
122 """Put an item into the queue.
123
Victor Stinner952ec982014-12-22 22:09:50 +0100124 Put an item into the queue. If the queue is full, wait until a free
125 slot is available before adding item.
126
127 This method is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700128 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700129 while self.full():
Yury Selivanov7661db62016-05-16 15:38:39 -0400130 putter = self._loop.create_future()
Guido van Rossum99f96c52015-09-28 07:42:34 -0700131 self._putters.append(putter)
132 try:
133 yield from putter
134 except:
135 putter.cancel() # Just in case putter is not done yet.
136 if not self.full() and not putter.cancelled():
137 # We were woken up by get_nowait(), but can't take
138 # the call. Wake up the next in line.
139 self._wakeup_next(self._putters)
140 raise
141 return self.put_nowait(item)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700142
143 def put_nowait(self, item):
144 """Put an item into the queue without blocking.
145
Guido van Rossumfef70982014-01-25 17:24:51 -0800146 If no free slot is immediately available, raise QueueFull.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700147 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700148 if self.full():
Guido van Rossumfef70982014-01-25 17:24:51 -0800149 raise QueueFull
Guido van Rossum99f96c52015-09-28 07:42:34 -0700150 self._put(item)
151 self._unfinished_tasks += 1
152 self._finished.clear()
153 self._wakeup_next(self._getters)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700154
155 @coroutine
156 def get(self):
157 """Remove and return an item from the queue.
158
Victor Stinner952ec982014-12-22 22:09:50 +0100159 If queue is empty, wait until an item is available.
160
161 This method is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700162 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700163 while self.empty():
Yury Selivanov7661db62016-05-16 15:38:39 -0400164 getter = self._loop.create_future()
Guido van Rossum99f96c52015-09-28 07:42:34 -0700165 self._getters.append(getter)
Yury Selivanov3fc0f2d2015-08-05 13:52:33 -0400166 try:
Guido van Rossum99f96c52015-09-28 07:42:34 -0700167 yield from getter
168 except:
169 getter.cancel() # Just in case getter is not done yet.
Suren Nihalanic62f0cb2017-11-07 09:35:23 -0800170
171 try:
172 self._getters.remove(getter)
173 except ValueError:
174 pass
175
Guido van Rossum99f96c52015-09-28 07:42:34 -0700176 if not self.empty() and not getter.cancelled():
177 # We were woken up by put_nowait(), but can't take
178 # the call. Wake up the next in line.
179 self._wakeup_next(self._getters)
Yury Selivanov3fc0f2d2015-08-05 13:52:33 -0400180 raise
Guido van Rossum99f96c52015-09-28 07:42:34 -0700181 return self.get_nowait()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700182
183 def get_nowait(self):
184 """Remove and return an item from the queue.
185
Guido van Rossumfef70982014-01-25 17:24:51 -0800186 Return an item if one is immediately available, else raise QueueEmpty.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700187 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700188 if self.empty():
Guido van Rossumfef70982014-01-25 17:24:51 -0800189 raise QueueEmpty
Guido van Rossum99f96c52015-09-28 07:42:34 -0700190 item = self._get()
191 self._wakeup_next(self._putters)
192 return item
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700193
Victor Stinner4cb814c2015-02-17 22:53:28 +0100194 def task_done(self):
195 """Indicate that a formerly enqueued task is complete.
196
197 Used by queue consumers. For each get() used to fetch a task,
198 a subsequent call to task_done() tells the queue that the processing
199 on the task is complete.
200
201 If a join() is currently blocking, it will resume when all items have
202 been processed (meaning that a task_done() call was received for every
203 item that had been put() into the queue).
204
205 Raises ValueError if called more times than there were items placed in
206 the queue.
207 """
208 if self._unfinished_tasks <= 0:
209 raise ValueError('task_done() called too many times')
210 self._unfinished_tasks -= 1
211 if self._unfinished_tasks == 0:
212 self._finished.set()
213
214 @coroutine
215 def join(self):
216 """Block until all items in the queue have been gotten and processed.
217
218 The count of unfinished tasks goes up whenever an item is added to the
219 queue. The count goes down whenever a consumer calls task_done() to
220 indicate that the item was retrieved and all work on it is complete.
221 When the count of unfinished tasks drops to zero, join() unblocks.
222 """
223 if self._unfinished_tasks > 0:
224 yield from self._finished.wait()
225
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700226
227class PriorityQueue(Queue):
228 """A subclass of Queue; retrieves entries in priority order (lowest first).
229
230 Entries are typically tuples of the form: (priority number, data).
231 """
232
233 def _init(self, maxsize):
234 self._queue = []
235
236 def _put(self, item, heappush=heapq.heappush):
237 heappush(self._queue, item)
238
239 def _get(self, heappop=heapq.heappop):
240 return heappop(self._queue)
241
242
243class LifoQueue(Queue):
244 """A subclass of Queue that retrieves most recently added entries first."""
245
246 def _init(self, maxsize):
247 self._queue = []
248
249 def _put(self, item):
250 self._queue.append(item)
251
252 def _get(self):
253 return self._queue.pop()
254
255
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200256if not compat.PY35:
257 JoinableQueue = Queue
258 """Deprecated alias for Queue."""
259 __all__.append('JoinableQueue')