blob: 572425e844c52ee33f20ad380918d6a736a3dc85 [file] [log] [blame]
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +00001'''A multi-producer, multi-consumer queue.'''
Guido van Rossum9022fce1992-08-25 12:30:44 +00002
Benjamin Peterson0ed52452009-03-21 17:36:10 +00003try:
Raymond Hettinger143f51a2012-01-09 05:32:01 +00004 import threading
Benjamin Peterson0ed52452009-03-21 17:36:10 +00005except ImportError:
Raymond Hettinger7b7caa82012-01-09 20:02:24 +00006 import dummy_threading as threading
Raymond Hettinger756b3f32004-01-29 06:37:52 +00007from collections import deque
Raymond Hettinger143f51a2012-01-09 05:32:01 +00008from heapq import heappush, heappop
Victor Stinnerae586492014-09-02 23:18:25 +02009from time import monotonic as time
Martin v. Löwis77ac4292002-10-15 15:11:13 +000010
Raymond Hettinger35641462008-01-17 00:13:27 +000011__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue']
Brett Cannonb42bb5a2003-07-01 05:34:27 +000012
Tim Petersb8c94172001-01-15 22:53:46 +000013class Empty(Exception):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000014 'Exception raised by Queue.get(block=0)/get_nowait().'
Tim Petersb8c94172001-01-15 22:53:46 +000015 pass
16
17class Full(Exception):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000018 'Exception raised by Queue.put(block=0)/put_nowait().'
Tim Petersb8c94172001-01-15 22:53:46 +000019 pass
Guido van Rossum9022fce1992-08-25 12:30:44 +000020
21class Queue:
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000022 '''Create a queue object with a given maximum size.
Guido van Rossum9022fce1992-08-25 12:30:44 +000023
Thomas Wouters0e3f5912006-08-11 14:57:12 +000024 If maxsize is <= 0, the queue size is infinite.
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000025 '''
26
Thomas Wouters0e3f5912006-08-11 14:57:12 +000027 def __init__(self, maxsize=0):
Raymond Hettingerda3caed2008-01-14 21:39:24 +000028 self.maxsize = maxsize
Guido van Rossum45e2fbc1998-03-26 21:13:24 +000029 self._init(maxsize)
Raymond Hettinger75404272012-01-07 15:32:52 -080030
Tim Peters5af0e412004-07-12 00:45:14 +000031 # mutex must be held whenever the queue is mutating. All methods
32 # that acquire mutex must release it before returning. mutex
Thomas Wouters89f507f2006-12-13 04:49:30 +000033 # is shared between the three conditions, so acquiring and
Tim Peters5af0e412004-07-12 00:45:14 +000034 # releasing the conditions also acquires and releases mutex.
Raymond Hettinger143f51a2012-01-09 05:32:01 +000035 self.mutex = threading.Lock()
Raymond Hettinger75404272012-01-07 15:32:52 -080036
Tim Peters5af0e412004-07-12 00:45:14 +000037 # Notify not_empty whenever an item is added to the queue; a
38 # thread waiting to get is notified then.
Raymond Hettinger143f51a2012-01-09 05:32:01 +000039 self.not_empty = threading.Condition(self.mutex)
Raymond Hettinger75404272012-01-07 15:32:52 -080040
Tim Peters5af0e412004-07-12 00:45:14 +000041 # Notify not_full whenever an item is removed from the queue;
42 # a thread waiting to put is notified then.
Raymond Hettinger143f51a2012-01-09 05:32:01 +000043 self.not_full = threading.Condition(self.mutex)
Raymond Hettinger75404272012-01-07 15:32:52 -080044
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000045 # Notify all_tasks_done whenever the number of unfinished tasks
46 # drops to zero; thread waiting to join() is notified to resume
Raymond Hettinger143f51a2012-01-09 05:32:01 +000047 self.all_tasks_done = threading.Condition(self.mutex)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000048 self.unfinished_tasks = 0
49
50 def task_done(self):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000051 '''Indicate that a formerly enqueued task is complete.
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000052
53 Used by Queue consumer threads. For each get() used to fetch a task,
54 a subsequent call to task_done() tells the queue that the processing
55 on the task is complete.
56
57 If a join() is currently blocking, it will resume when all items
58 have been processed (meaning that a task_done() call was received
59 for every item that had been put() into the queue).
60
61 Raises a ValueError if called more times than there were items
62 placed in the queue.
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000063 '''
Raymond Hettinger75404272012-01-07 15:32:52 -080064 with self.all_tasks_done:
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000065 unfinished = self.unfinished_tasks - 1
66 if unfinished <= 0:
67 if unfinished < 0:
68 raise ValueError('task_done() called too many times')
Benjamin Peterson672b8032008-06-11 19:14:14 +000069 self.all_tasks_done.notify_all()
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000070 self.unfinished_tasks = unfinished
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000071
72 def join(self):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000073 '''Blocks until all items in the Queue have been gotten and processed.
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000074
75 The count of unfinished tasks goes up whenever an item is added to the
76 queue. The count goes down whenever a consumer thread calls task_done()
77 to indicate the item was retrieved and all work on it is complete.
78
79 When the count of unfinished tasks drops to zero, join() unblocks.
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000080 '''
Raymond Hettinger75404272012-01-07 15:32:52 -080081 with self.all_tasks_done:
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000082 while self.unfinished_tasks:
83 self.all_tasks_done.wait()
Guido van Rossum9022fce1992-08-25 12:30:44 +000084
Barry Warsaw3d96d521997-11-20 19:56:38 +000085 def qsize(self):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000086 '''Return the approximate size of the queue (not reliable!).'''
Raymond Hettinger75404272012-01-07 15:32:52 -080087 with self.mutex:
88 return self._qsize()
Barry Warsaw3d96d521997-11-20 19:56:38 +000089
Alexandre Vassalottif260e442008-05-11 19:59:59 +000090 def empty(self):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000091 '''Return True if the queue is empty, False otherwise (not reliable!).
Raymond Hettinger611eaf02009-03-06 23:55:28 +000092
93 This method is likely to be removed at some point. Use qsize() == 0
94 as a direct substitute, but be aware that either approach risks a race
95 condition where a queue can grow before the result of empty() or
96 qsize() can be used.
97
98 To create code that needs to wait for all queued tasks to be
99 completed, the preferred technique is to use the join() method.
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000100 '''
Raymond Hettinger75404272012-01-07 15:32:52 -0800101 with self.mutex:
102 return not self._qsize()
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000103
104 def full(self):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000105 '''Return True if the queue is full, False otherwise (not reliable!).
Raymond Hettinger611eaf02009-03-06 23:55:28 +0000106
Raymond Hettinger189316a2010-10-31 17:57:52 +0000107 This method is likely to be removed at some point. Use qsize() >= n
Raymond Hettinger611eaf02009-03-06 23:55:28 +0000108 as a direct substitute, but be aware that either approach risks a race
109 condition where a queue can shrink before the result of full() or
110 qsize() can be used.
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000111 '''
Raymond Hettinger75404272012-01-07 15:32:52 -0800112 with self.mutex:
113 return 0 < self.maxsize <= self._qsize()
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000114
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000115 def put(self, item, block=True, timeout=None):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000116 '''Put an item into the queue.
Guido van Rossumc09e6b11998-04-09 14:25:32 +0000117
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000118 If optional args 'block' is true and 'timeout' is None (the default),
119 block if necessary until a free slot is available. If 'timeout' is
Terry Jan Reedy7608b602013-08-10 18:17:13 -0400120 a non-negative number, it blocks at most 'timeout' seconds and raises
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000121 the Full exception if no free slot was available within that time.
122 Otherwise ('block' is false), put an item on the queue if a free slot
123 is immediately available, else raise the Full exception ('timeout'
124 is ignored in that case).
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000125 '''
Raymond Hettinger75404272012-01-07 15:32:52 -0800126 with self.not_full:
Raymond Hettingerae138cb2008-01-15 20:42:00 +0000127 if self.maxsize > 0:
128 if not block:
Raymond Hettinger189316a2010-10-31 17:57:52 +0000129 if self._qsize() >= self.maxsize:
Raymond Hettingerae138cb2008-01-15 20:42:00 +0000130 raise Full
131 elif timeout is None:
Raymond Hettinger189316a2010-10-31 17:57:52 +0000132 while self._qsize() >= self.maxsize:
Raymond Hettingerda3caed2008-01-14 21:39:24 +0000133 self.not_full.wait()
Raymond Hettingerae138cb2008-01-15 20:42:00 +0000134 elif timeout < 0:
Terry Jan Reedy7608b602013-08-10 18:17:13 -0400135 raise ValueError("'timeout' must be a non-negative number")
Raymond Hettingerae138cb2008-01-15 20:42:00 +0000136 else:
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000137 endtime = time() + timeout
Raymond Hettinger189316a2010-10-31 17:57:52 +0000138 while self._qsize() >= self.maxsize:
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000139 remaining = endtime - time()
Raymond Hettingerda3caed2008-01-14 21:39:24 +0000140 if remaining <= 0.0:
141 raise Full
142 self.not_full.wait(remaining)
Mark Hammond3b959db2002-04-19 00:11:32 +0000143 self._put(item)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000144 self.unfinished_tasks += 1
Tim Peters5af0e412004-07-12 00:45:14 +0000145 self.not_empty.notify()
Barry Warsaw3d96d521997-11-20 19:56:38 +0000146
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000147 def get(self, block=True, timeout=None):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000148 '''Remove and return an item from the queue.
Guido van Rossumc09e6b11998-04-09 14:25:32 +0000149
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000150 If optional args 'block' is true and 'timeout' is None (the default),
151 block if necessary until an item is available. If 'timeout' is
Terry Jan Reedy7608b602013-08-10 18:17:13 -0400152 a non-negative number, it blocks at most 'timeout' seconds and raises
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000153 the Empty exception if no item was available within that time.
154 Otherwise ('block' is false), return an item if one is immediately
155 available, else raise the Empty exception ('timeout' is ignored
156 in that case).
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000157 '''
Raymond Hettinger75404272012-01-07 15:32:52 -0800158 with self.not_empty:
Tim Peters71ed2202004-07-12 01:20:32 +0000159 if not block:
Raymond Hettingerda3caed2008-01-14 21:39:24 +0000160 if not self._qsize():
Tim Peters71ed2202004-07-12 01:20:32 +0000161 raise Empty
162 elif timeout is None:
Raymond Hettingerda3caed2008-01-14 21:39:24 +0000163 while not self._qsize():
Tim Peters5af0e412004-07-12 00:45:14 +0000164 self.not_empty.wait()
Raymond Hettingerae138cb2008-01-15 20:42:00 +0000165 elif timeout < 0:
Terry Jan Reedy7608b602013-08-10 18:17:13 -0400166 raise ValueError("'timeout' must be a non-negative number")
Tim Peters5af0e412004-07-12 00:45:14 +0000167 else:
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000168 endtime = time() + timeout
Raymond Hettingerda3caed2008-01-14 21:39:24 +0000169 while not self._qsize():
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000170 remaining = endtime - time()
Tim Peters71ed2202004-07-12 01:20:32 +0000171 if remaining <= 0.0:
Tim Peters5af0e412004-07-12 00:45:14 +0000172 raise Empty
173 self.not_empty.wait(remaining)
Mark Hammond3b959db2002-04-19 00:11:32 +0000174 item = self._get()
Tim Peters5af0e412004-07-12 00:45:14 +0000175 self.not_full.notify()
176 return item
Guido van Rossum9022fce1992-08-25 12:30:44 +0000177
Raymond Hettinger61bd7292012-01-09 06:02:08 +0000178 def put_nowait(self, item):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000179 '''Put an item into the queue without blocking.
Raymond Hettinger61bd7292012-01-09 06:02:08 +0000180
181 Only enqueue the item if a free slot is immediately available.
182 Otherwise raise the Full exception.
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000183 '''
Raymond Hettinger61bd7292012-01-09 06:02:08 +0000184 return self.put(item, block=False)
185
Guido van Rossum9e1721f1999-02-08 18:34:01 +0000186 def get_nowait(self):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000187 '''Remove and return an item from the queue without blocking.
Guido van Rossum9022fce1992-08-25 12:30:44 +0000188
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000189 Only get an item if one is immediately available. Otherwise
Guido van Rossum9e1721f1999-02-08 18:34:01 +0000190 raise the Empty exception.
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000191 '''
Raymond Hettinger61bd7292012-01-09 06:02:08 +0000192 return self.get(block=False)
Guido van Rossum9022fce1992-08-25 12:30:44 +0000193
Barry Warsaw3d96d521997-11-20 19:56:38 +0000194 # Override these methods to implement other queue organizations
195 # (e.g. stack or priority queue).
196 # These will only be called with appropriate locks held
Guido van Rossum9022fce1992-08-25 12:30:44 +0000197
Barry Warsaw3d96d521997-11-20 19:56:38 +0000198 # Initialize the queue representation
199 def _init(self, maxsize):
Raymond Hettinger756b3f32004-01-29 06:37:52 +0000200 self.queue = deque()
Guido van Rossum9022fce1992-08-25 12:30:44 +0000201
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000202 def _qsize(self):
Guido van Rossum45e2fbc1998-03-26 21:13:24 +0000203 return len(self.queue)
Guido van Rossum9022fce1992-08-25 12:30:44 +0000204
Barry Warsaw3d96d521997-11-20 19:56:38 +0000205 # Put a new item in the queue
206 def _put(self, item):
Guido van Rossum45e2fbc1998-03-26 21:13:24 +0000207 self.queue.append(item)
Guido van Rossum9022fce1992-08-25 12:30:44 +0000208
Barry Warsaw3d96d521997-11-20 19:56:38 +0000209 # Get an item from the queue
210 def _get(self):
Raymond Hettinger756b3f32004-01-29 06:37:52 +0000211 return self.queue.popleft()
Raymond Hettinger35641462008-01-17 00:13:27 +0000212
213
214class PriorityQueue(Queue):
215 '''Variant of Queue that retrieves open entries in priority order (lowest first).
216
217 Entries are typically tuples of the form: (priority number, data).
218 '''
219
220 def _init(self, maxsize):
221 self.queue = []
222
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000223 def _qsize(self):
Raymond Hettinger35641462008-01-17 00:13:27 +0000224 return len(self.queue)
225
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000226 def _put(self, item):
Raymond Hettinger35641462008-01-17 00:13:27 +0000227 heappush(self.queue, item)
228
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000229 def _get(self):
Raymond Hettinger35641462008-01-17 00:13:27 +0000230 return heappop(self.queue)
231
232
233class LifoQueue(Queue):
234 '''Variant of Queue that retrieves most recently added entries first.'''
235
236 def _init(self, maxsize):
237 self.queue = []
238
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000239 def _qsize(self):
Raymond Hettinger35641462008-01-17 00:13:27 +0000240 return len(self.queue)
241
242 def _put(self, item):
243 self.queue.append(item)
244
245 def _get(self):
246 return self.queue.pop()