blob: c803b96deb085d7fef345ac8e8e01ce549f22c5e [file] [log] [blame]
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +00001'''A multi-producer, multi-consumer queue.'''
Guido van Rossum9022fce1992-08-25 12:30:44 +00002
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003import threading
Raymond Hettinger756b3f32004-01-29 06:37:52 +00004from collections import deque
Raymond Hettinger143f51a2012-01-09 05:32:01 +00005from heapq import heappush, heappop
Victor Stinnerae586492014-09-02 23:18:25 +02006from time import monotonic as time
Martin v. Löwis77ac4292002-10-15 15:11:13 +00007
Raymond Hettinger35641462008-01-17 00:13:27 +00008__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue']
Brett Cannonb42bb5a2003-07-01 05:34:27 +00009
Tim Petersb8c94172001-01-15 22:53:46 +000010class Empty(Exception):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000011 'Exception raised by Queue.get(block=0)/get_nowait().'
Tim Petersb8c94172001-01-15 22:53:46 +000012 pass
13
14class Full(Exception):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000015 'Exception raised by Queue.put(block=0)/put_nowait().'
Tim Petersb8c94172001-01-15 22:53:46 +000016 pass
Guido van Rossum9022fce1992-08-25 12:30:44 +000017
18class Queue:
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000019 '''Create a queue object with a given maximum size.
Guido van Rossum9022fce1992-08-25 12:30:44 +000020
Thomas Wouters0e3f5912006-08-11 14:57:12 +000021 If maxsize is <= 0, the queue size is infinite.
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000022 '''
23
Thomas Wouters0e3f5912006-08-11 14:57:12 +000024 def __init__(self, maxsize=0):
Raymond Hettingerda3caed2008-01-14 21:39:24 +000025 self.maxsize = maxsize
Guido van Rossum45e2fbc1998-03-26 21:13:24 +000026 self._init(maxsize)
Raymond Hettinger75404272012-01-07 15:32:52 -080027
Tim Peters5af0e412004-07-12 00:45:14 +000028 # mutex must be held whenever the queue is mutating. All methods
29 # that acquire mutex must release it before returning. mutex
Thomas Wouters89f507f2006-12-13 04:49:30 +000030 # is shared between the three conditions, so acquiring and
Tim Peters5af0e412004-07-12 00:45:14 +000031 # releasing the conditions also acquires and releases mutex.
Raymond Hettinger143f51a2012-01-09 05:32:01 +000032 self.mutex = threading.Lock()
Raymond Hettinger75404272012-01-07 15:32:52 -080033
Tim Peters5af0e412004-07-12 00:45:14 +000034 # Notify not_empty whenever an item is added to the queue; a
35 # thread waiting to get is notified then.
Raymond Hettinger143f51a2012-01-09 05:32:01 +000036 self.not_empty = threading.Condition(self.mutex)
Raymond Hettinger75404272012-01-07 15:32:52 -080037
Tim Peters5af0e412004-07-12 00:45:14 +000038 # Notify not_full whenever an item is removed from the queue;
39 # a thread waiting to put is notified then.
Raymond Hettinger143f51a2012-01-09 05:32:01 +000040 self.not_full = threading.Condition(self.mutex)
Raymond Hettinger75404272012-01-07 15:32:52 -080041
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000042 # Notify all_tasks_done whenever the number of unfinished tasks
43 # drops to zero; thread waiting to join() is notified to resume
Raymond Hettinger143f51a2012-01-09 05:32:01 +000044 self.all_tasks_done = threading.Condition(self.mutex)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000045 self.unfinished_tasks = 0
46
47 def task_done(self):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000048 '''Indicate that a formerly enqueued task is complete.
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000049
50 Used by Queue consumer threads. For each get() used to fetch a task,
51 a subsequent call to task_done() tells the queue that the processing
52 on the task is complete.
53
54 If a join() is currently blocking, it will resume when all items
55 have been processed (meaning that a task_done() call was received
56 for every item that had been put() into the queue).
57
58 Raises a ValueError if called more times than there were items
59 placed in the queue.
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000060 '''
Raymond Hettinger75404272012-01-07 15:32:52 -080061 with self.all_tasks_done:
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000062 unfinished = self.unfinished_tasks - 1
63 if unfinished <= 0:
64 if unfinished < 0:
65 raise ValueError('task_done() called too many times')
Benjamin Peterson672b8032008-06-11 19:14:14 +000066 self.all_tasks_done.notify_all()
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000067 self.unfinished_tasks = unfinished
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000068
69 def join(self):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000070 '''Blocks until all items in the Queue have been gotten and processed.
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000071
72 The count of unfinished tasks goes up whenever an item is added to the
73 queue. The count goes down whenever a consumer thread calls task_done()
74 to indicate the item was retrieved and all work on it is complete.
75
76 When the count of unfinished tasks drops to zero, join() unblocks.
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000077 '''
Raymond Hettinger75404272012-01-07 15:32:52 -080078 with self.all_tasks_done:
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000079 while self.unfinished_tasks:
80 self.all_tasks_done.wait()
Guido van Rossum9022fce1992-08-25 12:30:44 +000081
Barry Warsaw3d96d521997-11-20 19:56:38 +000082 def qsize(self):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000083 '''Return the approximate size of the queue (not reliable!).'''
Raymond Hettinger75404272012-01-07 15:32:52 -080084 with self.mutex:
85 return self._qsize()
Barry Warsaw3d96d521997-11-20 19:56:38 +000086
Alexandre Vassalottif260e442008-05-11 19:59:59 +000087 def empty(self):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000088 '''Return True if the queue is empty, False otherwise (not reliable!).
Raymond Hettinger611eaf02009-03-06 23:55:28 +000089
90 This method is likely to be removed at some point. Use qsize() == 0
91 as a direct substitute, but be aware that either approach risks a race
92 condition where a queue can grow before the result of empty() or
93 qsize() can be used.
94
95 To create code that needs to wait for all queued tasks to be
96 completed, the preferred technique is to use the join() method.
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000097 '''
Raymond Hettinger75404272012-01-07 15:32:52 -080098 with self.mutex:
99 return not self._qsize()
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000100
101 def full(self):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000102 '''Return True if the queue is full, False otherwise (not reliable!).
Raymond Hettinger611eaf02009-03-06 23:55:28 +0000103
Raymond Hettinger189316a2010-10-31 17:57:52 +0000104 This method is likely to be removed at some point. Use qsize() >= n
Raymond Hettinger611eaf02009-03-06 23:55:28 +0000105 as a direct substitute, but be aware that either approach risks a race
106 condition where a queue can shrink before the result of full() or
107 qsize() can be used.
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000108 '''
Raymond Hettinger75404272012-01-07 15:32:52 -0800109 with self.mutex:
110 return 0 < self.maxsize <= self._qsize()
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000111
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000112 def put(self, item, block=True, timeout=None):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000113 '''Put an item into the queue.
Guido van Rossumc09e6b11998-04-09 14:25:32 +0000114
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000115 If optional args 'block' is true and 'timeout' is None (the default),
116 block if necessary until a free slot is available. If 'timeout' is
Terry Jan Reedy7608b602013-08-10 18:17:13 -0400117 a non-negative number, it blocks at most 'timeout' seconds and raises
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000118 the Full exception if no free slot was available within that time.
119 Otherwise ('block' is false), put an item on the queue if a free slot
120 is immediately available, else raise the Full exception ('timeout'
121 is ignored in that case).
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000122 '''
Raymond Hettinger75404272012-01-07 15:32:52 -0800123 with self.not_full:
Raymond Hettingerae138cb2008-01-15 20:42:00 +0000124 if self.maxsize > 0:
125 if not block:
Raymond Hettinger189316a2010-10-31 17:57:52 +0000126 if self._qsize() >= self.maxsize:
Raymond Hettingerae138cb2008-01-15 20:42:00 +0000127 raise Full
128 elif timeout is None:
Raymond Hettinger189316a2010-10-31 17:57:52 +0000129 while self._qsize() >= self.maxsize:
Raymond Hettingerda3caed2008-01-14 21:39:24 +0000130 self.not_full.wait()
Raymond Hettingerae138cb2008-01-15 20:42:00 +0000131 elif timeout < 0:
Terry Jan Reedy7608b602013-08-10 18:17:13 -0400132 raise ValueError("'timeout' must be a non-negative number")
Raymond Hettingerae138cb2008-01-15 20:42:00 +0000133 else:
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000134 endtime = time() + timeout
Raymond Hettinger189316a2010-10-31 17:57:52 +0000135 while self._qsize() >= self.maxsize:
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000136 remaining = endtime - time()
Raymond Hettingerda3caed2008-01-14 21:39:24 +0000137 if remaining <= 0.0:
138 raise Full
139 self.not_full.wait(remaining)
Mark Hammond3b959db2002-04-19 00:11:32 +0000140 self._put(item)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000141 self.unfinished_tasks += 1
Tim Peters5af0e412004-07-12 00:45:14 +0000142 self.not_empty.notify()
Barry Warsaw3d96d521997-11-20 19:56:38 +0000143
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000144 def get(self, block=True, timeout=None):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000145 '''Remove and return an item from the queue.
Guido van Rossumc09e6b11998-04-09 14:25:32 +0000146
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000147 If optional args 'block' is true and 'timeout' is None (the default),
148 block if necessary until an item is available. If 'timeout' is
Terry Jan Reedy7608b602013-08-10 18:17:13 -0400149 a non-negative number, it blocks at most 'timeout' seconds and raises
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000150 the Empty exception if no item was available within that time.
151 Otherwise ('block' is false), return an item if one is immediately
152 available, else raise the Empty exception ('timeout' is ignored
153 in that case).
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000154 '''
Raymond Hettinger75404272012-01-07 15:32:52 -0800155 with self.not_empty:
Tim Peters71ed2202004-07-12 01:20:32 +0000156 if not block:
Raymond Hettingerda3caed2008-01-14 21:39:24 +0000157 if not self._qsize():
Tim Peters71ed2202004-07-12 01:20:32 +0000158 raise Empty
159 elif timeout is None:
Raymond Hettingerda3caed2008-01-14 21:39:24 +0000160 while not self._qsize():
Tim Peters5af0e412004-07-12 00:45:14 +0000161 self.not_empty.wait()
Raymond Hettingerae138cb2008-01-15 20:42:00 +0000162 elif timeout < 0:
Terry Jan Reedy7608b602013-08-10 18:17:13 -0400163 raise ValueError("'timeout' must be a non-negative number")
Tim Peters5af0e412004-07-12 00:45:14 +0000164 else:
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000165 endtime = time() + timeout
Raymond Hettingerda3caed2008-01-14 21:39:24 +0000166 while not self._qsize():
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000167 remaining = endtime - time()
Tim Peters71ed2202004-07-12 01:20:32 +0000168 if remaining <= 0.0:
Tim Peters5af0e412004-07-12 00:45:14 +0000169 raise Empty
170 self.not_empty.wait(remaining)
Mark Hammond3b959db2002-04-19 00:11:32 +0000171 item = self._get()
Tim Peters5af0e412004-07-12 00:45:14 +0000172 self.not_full.notify()
173 return item
Guido van Rossum9022fce1992-08-25 12:30:44 +0000174
Raymond Hettinger61bd7292012-01-09 06:02:08 +0000175 def put_nowait(self, item):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000176 '''Put an item into the queue without blocking.
Raymond Hettinger61bd7292012-01-09 06:02:08 +0000177
178 Only enqueue the item if a free slot is immediately available.
179 Otherwise raise the Full exception.
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000180 '''
Raymond Hettinger61bd7292012-01-09 06:02:08 +0000181 return self.put(item, block=False)
182
Guido van Rossum9e1721f1999-02-08 18:34:01 +0000183 def get_nowait(self):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000184 '''Remove and return an item from the queue without blocking.
Guido van Rossum9022fce1992-08-25 12:30:44 +0000185
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000186 Only get an item if one is immediately available. Otherwise
Guido van Rossum9e1721f1999-02-08 18:34:01 +0000187 raise the Empty exception.
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000188 '''
Raymond Hettinger61bd7292012-01-09 06:02:08 +0000189 return self.get(block=False)
Guido van Rossum9022fce1992-08-25 12:30:44 +0000190
Barry Warsaw3d96d521997-11-20 19:56:38 +0000191 # Override these methods to implement other queue organizations
192 # (e.g. stack or priority queue).
193 # These will only be called with appropriate locks held
Guido van Rossum9022fce1992-08-25 12:30:44 +0000194
Barry Warsaw3d96d521997-11-20 19:56:38 +0000195 # Initialize the queue representation
196 def _init(self, maxsize):
Raymond Hettinger756b3f32004-01-29 06:37:52 +0000197 self.queue = deque()
Guido van Rossum9022fce1992-08-25 12:30:44 +0000198
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000199 def _qsize(self):
Guido van Rossum45e2fbc1998-03-26 21:13:24 +0000200 return len(self.queue)
Guido van Rossum9022fce1992-08-25 12:30:44 +0000201
Barry Warsaw3d96d521997-11-20 19:56:38 +0000202 # Put a new item in the queue
203 def _put(self, item):
Guido van Rossum45e2fbc1998-03-26 21:13:24 +0000204 self.queue.append(item)
Guido van Rossum9022fce1992-08-25 12:30:44 +0000205
Barry Warsaw3d96d521997-11-20 19:56:38 +0000206 # Get an item from the queue
207 def _get(self):
Raymond Hettinger756b3f32004-01-29 06:37:52 +0000208 return self.queue.popleft()
Raymond Hettinger35641462008-01-17 00:13:27 +0000209
210
211class PriorityQueue(Queue):
212 '''Variant of Queue that retrieves open entries in priority order (lowest first).
213
214 Entries are typically tuples of the form: (priority number, data).
215 '''
216
217 def _init(self, maxsize):
218 self.queue = []
219
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000220 def _qsize(self):
Raymond Hettinger35641462008-01-17 00:13:27 +0000221 return len(self.queue)
222
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000223 def _put(self, item):
Raymond Hettinger35641462008-01-17 00:13:27 +0000224 heappush(self.queue, item)
225
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000226 def _get(self):
Raymond Hettinger35641462008-01-17 00:13:27 +0000227 return heappop(self.queue)
228
229
230class LifoQueue(Queue):
231 '''Variant of Queue that retrieves most recently added entries first.'''
232
233 def _init(self, maxsize):
234 self.queue = []
235
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000236 def _qsize(self):
Raymond Hettinger35641462008-01-17 00:13:27 +0000237 return len(self.queue)
238
239 def _put(self, item):
240 self.queue.append(item)
241
242 def _get(self):
243 return self.queue.pop()