| Guido van Rossum | 4acc25b | 2000-02-02 15:10:15 +0000 | [diff] [blame] | 1 | """A multi-producer, multi-consumer queue.""" | 
| Guido van Rossum | 9022fce | 1992-08-25 12:30:44 +0000 | [diff] [blame] | 2 |  | 
| Tim Peters | 5af0e41 | 2004-07-12 00:45:14 +0000 | [diff] [blame] | 3 | from time import time as _time | 
| Raymond Hettinger | 756b3f3 | 2004-01-29 06:37:52 +0000 | [diff] [blame] | 4 | from collections import deque | 
| Martin v. Löwis | 77ac429 | 2002-10-15 15:11:13 +0000 | [diff] [blame] | 5 |  | 
| Brett Cannon | b42bb5a | 2003-07-01 05:34:27 +0000 | [diff] [blame] | 6 | __all__ = ['Empty', 'Full', 'Queue'] | 
 | 7 |  | 
| Tim Peters | b8c9417 | 2001-01-15 22:53:46 +0000 | [diff] [blame] | 8 | class Empty(Exception): | 
 | 9 |     "Exception raised by Queue.get(block=0)/get_nowait()." | 
 | 10 |     pass | 
 | 11 |  | 
 | 12 | class Full(Exception): | 
 | 13 |     "Exception raised by Queue.put(block=0)/put_nowait()." | 
 | 14 |     pass | 
| Guido van Rossum | 9022fce | 1992-08-25 12:30:44 +0000 | [diff] [blame] | 15 |  | 
 | 16 | class Queue: | 
| Skip Montanaro | 86116e2 | 2006-06-10 14:09:11 +0000 | [diff] [blame] | 17 |     """Create a queue object with a given maximum size. | 
| Guido van Rossum | 9022fce | 1992-08-25 12:30:44 +0000 | [diff] [blame] | 18 |  | 
| Skip Montanaro | 86116e2 | 2006-06-10 14:09:11 +0000 | [diff] [blame] | 19 |     If maxsize is <= 0, the queue size is infinite. | 
 | 20 |     """ | 
 | 21 |     def __init__(self, maxsize=0): | 
| Guido van Rossum | a093424 | 2002-12-30 22:36:09 +0000 | [diff] [blame] | 22 |         try: | 
| Tim Peters | 5af0e41 | 2004-07-12 00:45:14 +0000 | [diff] [blame] | 23 |             import threading | 
| Guido van Rossum | a093424 | 2002-12-30 22:36:09 +0000 | [diff] [blame] | 24 |         except ImportError: | 
| Tim Peters | 5af0e41 | 2004-07-12 00:45:14 +0000 | [diff] [blame] | 25 |             import dummy_threading as threading | 
| Guido van Rossum | 45e2fbc | 1998-03-26 21:13:24 +0000 | [diff] [blame] | 26 |         self._init(maxsize) | 
| Tim Peters | 5af0e41 | 2004-07-12 00:45:14 +0000 | [diff] [blame] | 27 |         # mutex must be held whenever the queue is mutating.  All methods | 
 | 28 |         # that acquire mutex must release it before returning.  mutex | 
| Raymond Hettinger | a3c7767 | 2006-11-23 21:35:19 +0000 | [diff] [blame] | 29 |         # is shared between the three conditions, so acquiring and | 
| Tim Peters | 5af0e41 | 2004-07-12 00:45:14 +0000 | [diff] [blame] | 30 |         # releasing the conditions also acquires and releases mutex. | 
 | 31 |         self.mutex = threading.Lock() | 
 | 32 |         # Notify not_empty whenever an item is added to the queue; a | 
 | 33 |         # thread waiting to get is notified then. | 
 | 34 |         self.not_empty = threading.Condition(self.mutex) | 
 | 35 |         # Notify not_full whenever an item is removed from the queue; | 
 | 36 |         # a thread waiting to put is notified then. | 
 | 37 |         self.not_full = threading.Condition(self.mutex) | 
| Raymond Hettinger | fd3fcf0 | 2006-03-24 20:43:29 +0000 | [diff] [blame] | 38 |         # Notify all_tasks_done whenever the number of unfinished tasks | 
 | 39 |         # drops to zero; thread waiting to join() is notified to resume | 
 | 40 |         self.all_tasks_done = threading.Condition(self.mutex) | 
 | 41 |         self.unfinished_tasks = 0 | 
 | 42 |  | 
 | 43 |     def task_done(self): | 
 | 44 |         """Indicate that a formerly enqueued task is complete. | 
 | 45 |  | 
 | 46 |         Used by Queue consumer threads.  For each get() used to fetch a task, | 
 | 47 |         a subsequent call to task_done() tells the queue that the processing | 
 | 48 |         on the task is complete. | 
 | 49 |  | 
 | 50 |         If a join() is currently blocking, it will resume when all items | 
 | 51 |         have been processed (meaning that a task_done() call was received | 
 | 52 |         for every item that had been put() into the queue). | 
 | 53 |  | 
 | 54 |         Raises a ValueError if called more times than there were items | 
 | 55 |         placed in the queue. | 
 | 56 |         """ | 
 | 57 |         self.all_tasks_done.acquire() | 
 | 58 |         try: | 
| Raymond Hettinger | c4e94b9 | 2006-03-25 12:15:04 +0000 | [diff] [blame] | 59 |             unfinished = self.unfinished_tasks - 1 | 
| Raymond Hettinger | fd3fcf0 | 2006-03-24 20:43:29 +0000 | [diff] [blame] | 60 |             if unfinished <= 0: | 
 | 61 |                 if unfinished < 0: | 
| Tim Peters | e33901e | 2006-03-25 01:50:43 +0000 | [diff] [blame] | 62 |                     raise ValueError('task_done() called too many times') | 
| Raymond Hettinger | fd3fcf0 | 2006-03-24 20:43:29 +0000 | [diff] [blame] | 63 |                 self.all_tasks_done.notifyAll() | 
| Raymond Hettinger | c4e94b9 | 2006-03-25 12:15:04 +0000 | [diff] [blame] | 64 |             self.unfinished_tasks = unfinished | 
| Raymond Hettinger | fd3fcf0 | 2006-03-24 20:43:29 +0000 | [diff] [blame] | 65 |         finally: | 
 | 66 |             self.all_tasks_done.release() | 
 | 67 |  | 
 | 68 |     def join(self): | 
 | 69 |         """Blocks until all items in the Queue have been gotten and processed. | 
 | 70 |  | 
 | 71 |         The count of unfinished tasks goes up whenever an item is added to the | 
 | 72 |         queue. The count goes down whenever a consumer thread calls task_done() | 
 | 73 |         to indicate the item was retrieved and all work on it is complete. | 
 | 74 |  | 
 | 75 |         When the count of unfinished tasks drops to zero, join() unblocks. | 
 | 76 |         """ | 
 | 77 |         self.all_tasks_done.acquire() | 
 | 78 |         try: | 
 | 79 |             while self.unfinished_tasks: | 
 | 80 |                 self.all_tasks_done.wait() | 
 | 81 |         finally: | 
 | 82 |             self.all_tasks_done.release() | 
| Guido van Rossum | 9022fce | 1992-08-25 12:30:44 +0000 | [diff] [blame] | 83 |  | 
| Barry Warsaw | 3d96d52 | 1997-11-20 19:56:38 +0000 | [diff] [blame] | 84 |     def qsize(self): | 
| Guido van Rossum | 9e1721f | 1999-02-08 18:34:01 +0000 | [diff] [blame] | 85 |         """Return the approximate size of the queue (not reliable!).""" | 
| Guido van Rossum | 7e6d18c | 1998-04-29 14:29:32 +0000 | [diff] [blame] | 86 |         self.mutex.acquire() | 
| Guido van Rossum | 45e2fbc | 1998-03-26 21:13:24 +0000 | [diff] [blame] | 87 |         n = self._qsize() | 
| Guido van Rossum | 7e6d18c | 1998-04-29 14:29:32 +0000 | [diff] [blame] | 88 |         self.mutex.release() | 
| Guido van Rossum | 45e2fbc | 1998-03-26 21:13:24 +0000 | [diff] [blame] | 89 |         return n | 
| Barry Warsaw | 3d96d52 | 1997-11-20 19:56:38 +0000 | [diff] [blame] | 90 |  | 
 | 91 |     def empty(self): | 
| Martin v. Löwis | 77ac429 | 2002-10-15 15:11:13 +0000 | [diff] [blame] | 92 |         """Return True if the queue is empty, False otherwise (not reliable!).""" | 
| Guido van Rossum | 7e6d18c | 1998-04-29 14:29:32 +0000 | [diff] [blame] | 93 |         self.mutex.acquire() | 
| Guido van Rossum | 45e2fbc | 1998-03-26 21:13:24 +0000 | [diff] [blame] | 94 |         n = self._empty() | 
| Guido van Rossum | 7e6d18c | 1998-04-29 14:29:32 +0000 | [diff] [blame] | 95 |         self.mutex.release() | 
| Guido van Rossum | 45e2fbc | 1998-03-26 21:13:24 +0000 | [diff] [blame] | 96 |         return n | 
| Barry Warsaw | 3d96d52 | 1997-11-20 19:56:38 +0000 | [diff] [blame] | 97 |  | 
 | 98 |     def full(self): | 
| Martin v. Löwis | 77ac429 | 2002-10-15 15:11:13 +0000 | [diff] [blame] | 99 |         """Return True if the queue is full, False otherwise (not reliable!).""" | 
| Guido van Rossum | 7e6d18c | 1998-04-29 14:29:32 +0000 | [diff] [blame] | 100 |         self.mutex.acquire() | 
| Guido van Rossum | 45e2fbc | 1998-03-26 21:13:24 +0000 | [diff] [blame] | 101 |         n = self._full() | 
| Guido van Rossum | 7e6d18c | 1998-04-29 14:29:32 +0000 | [diff] [blame] | 102 |         self.mutex.release() | 
| Guido van Rossum | 45e2fbc | 1998-03-26 21:13:24 +0000 | [diff] [blame] | 103 |         return n | 
| Barry Warsaw | 3d96d52 | 1997-11-20 19:56:38 +0000 | [diff] [blame] | 104 |  | 
| Martin v. Löwis | 77ac429 | 2002-10-15 15:11:13 +0000 | [diff] [blame] | 105 |     def put(self, item, block=True, timeout=None): | 
| Guido van Rossum | c09e6b1 | 1998-04-09 14:25:32 +0000 | [diff] [blame] | 106 |         """Put an item into the queue. | 
 | 107 |  | 
| Martin v. Löwis | 77ac429 | 2002-10-15 15:11:13 +0000 | [diff] [blame] | 108 |         If optional args 'block' is true and 'timeout' is None (the default), | 
 | 109 |         block if necessary until a free slot is available. If 'timeout' is | 
 | 110 |         a positive number, it blocks at most 'timeout' seconds and raises | 
 | 111 |         the Full exception if no free slot was available within that time. | 
 | 112 |         Otherwise ('block' is false), put an item on the queue if a free slot | 
 | 113 |         is immediately available, else raise the Full exception ('timeout' | 
 | 114 |         is ignored in that case). | 
| Guido van Rossum | 9e1721f | 1999-02-08 18:34:01 +0000 | [diff] [blame] | 115 |         """ | 
| Tim Peters | 5af0e41 | 2004-07-12 00:45:14 +0000 | [diff] [blame] | 116 |         self.not_full.acquire() | 
| Mark Hammond | 3b959db | 2002-04-19 00:11:32 +0000 | [diff] [blame] | 117 |         try: | 
| Tim Peters | 71ed220 | 2004-07-12 01:20:32 +0000 | [diff] [blame] | 118 |             if not block: | 
 | 119 |                 if self._full(): | 
 | 120 |                     raise Full | 
 | 121 |             elif timeout is None: | 
| Tim Peters | 5af0e41 | 2004-07-12 00:45:14 +0000 | [diff] [blame] | 122 |                 while self._full(): | 
 | 123 |                     self.not_full.wait() | 
 | 124 |             else: | 
 | 125 |                 if timeout < 0: | 
 | 126 |                     raise ValueError("'timeout' must be a positive number") | 
 | 127 |                 endtime = _time() + timeout | 
 | 128 |                 while self._full(): | 
 | 129 |                     remaining = endtime - _time() | 
| Tim Peters | 71ed220 | 2004-07-12 01:20:32 +0000 | [diff] [blame] | 130 |                     if remaining <= 0.0: | 
| Tim Peters | 5af0e41 | 2004-07-12 00:45:14 +0000 | [diff] [blame] | 131 |                         raise Full | 
 | 132 |                     self.not_full.wait(remaining) | 
| Mark Hammond | 3b959db | 2002-04-19 00:11:32 +0000 | [diff] [blame] | 133 |             self._put(item) | 
| Raymond Hettinger | fd3fcf0 | 2006-03-24 20:43:29 +0000 | [diff] [blame] | 134 |             self.unfinished_tasks += 1 | 
| Tim Peters | 5af0e41 | 2004-07-12 00:45:14 +0000 | [diff] [blame] | 135 |             self.not_empty.notify() | 
| Mark Hammond | 3b959db | 2002-04-19 00:11:32 +0000 | [diff] [blame] | 136 |         finally: | 
| Tim Peters | 5af0e41 | 2004-07-12 00:45:14 +0000 | [diff] [blame] | 137 |             self.not_full.release() | 
| Barry Warsaw | 3d96d52 | 1997-11-20 19:56:38 +0000 | [diff] [blame] | 138 |  | 
| Guido van Rossum | 9e1721f | 1999-02-08 18:34:01 +0000 | [diff] [blame] | 139 |     def put_nowait(self, item): | 
 | 140 |         """Put an item into the queue without blocking. | 
| Guido van Rossum | c09e6b1 | 1998-04-09 14:25:32 +0000 | [diff] [blame] | 141 |  | 
| Guido van Rossum | 9e1721f | 1999-02-08 18:34:01 +0000 | [diff] [blame] | 142 |         Only enqueue the item if a free slot is immediately available. | 
 | 143 |         Otherwise raise the Full exception. | 
| Guido van Rossum | 45e2fbc | 1998-03-26 21:13:24 +0000 | [diff] [blame] | 144 |         """ | 
| Tim Peters | 71ed220 | 2004-07-12 01:20:32 +0000 | [diff] [blame] | 145 |         return self.put(item, False) | 
| Barry Warsaw | 3d96d52 | 1997-11-20 19:56:38 +0000 | [diff] [blame] | 146 |  | 
| Martin v. Löwis | 77ac429 | 2002-10-15 15:11:13 +0000 | [diff] [blame] | 147 |     def get(self, block=True, timeout=None): | 
| Guido van Rossum | 9e1721f | 1999-02-08 18:34:01 +0000 | [diff] [blame] | 148 |         """Remove and return an item from the queue. | 
| Guido van Rossum | c09e6b1 | 1998-04-09 14:25:32 +0000 | [diff] [blame] | 149 |  | 
| Martin v. Löwis | 77ac429 | 2002-10-15 15:11:13 +0000 | [diff] [blame] | 150 |         If optional args 'block' is true and 'timeout' is None (the default), | 
 | 151 |         block if necessary until an item is available. If 'timeout' is | 
 | 152 |         a positive number, it blocks at most 'timeout' seconds and raises | 
 | 153 |         the Empty exception if no item was available within that time. | 
 | 154 |         Otherwise ('block' is false), return an item if one is immediately | 
 | 155 |         available, else raise the Empty exception ('timeout' is ignored | 
 | 156 |         in that case). | 
| Guido van Rossum | 45e2fbc | 1998-03-26 21:13:24 +0000 | [diff] [blame] | 157 |         """ | 
| Tim Peters | 5af0e41 | 2004-07-12 00:45:14 +0000 | [diff] [blame] | 158 |         self.not_empty.acquire() | 
| Mark Hammond | 3b959db | 2002-04-19 00:11:32 +0000 | [diff] [blame] | 159 |         try: | 
| Tim Peters | 71ed220 | 2004-07-12 01:20:32 +0000 | [diff] [blame] | 160 |             if not block: | 
 | 161 |                 if self._empty(): | 
 | 162 |                     raise Empty | 
 | 163 |             elif timeout is None: | 
| Tim Peters | 5af0e41 | 2004-07-12 00:45:14 +0000 | [diff] [blame] | 164 |                 while self._empty(): | 
 | 165 |                     self.not_empty.wait() | 
 | 166 |             else: | 
 | 167 |                 if timeout < 0: | 
 | 168 |                     raise ValueError("'timeout' must be a positive number") | 
 | 169 |                 endtime = _time() + timeout | 
 | 170 |                 while self._empty(): | 
 | 171 |                     remaining = endtime - _time() | 
| Tim Peters | 71ed220 | 2004-07-12 01:20:32 +0000 | [diff] [blame] | 172 |                     if remaining <= 0.0: | 
| Tim Peters | 5af0e41 | 2004-07-12 00:45:14 +0000 | [diff] [blame] | 173 |                         raise Empty | 
 | 174 |                     self.not_empty.wait(remaining) | 
| Mark Hammond | 3b959db | 2002-04-19 00:11:32 +0000 | [diff] [blame] | 175 |             item = self._get() | 
| Tim Peters | 5af0e41 | 2004-07-12 00:45:14 +0000 | [diff] [blame] | 176 |             self.not_full.notify() | 
 | 177 |             return item | 
| Mark Hammond | 3b959db | 2002-04-19 00:11:32 +0000 | [diff] [blame] | 178 |         finally: | 
| Tim Peters | 5af0e41 | 2004-07-12 00:45:14 +0000 | [diff] [blame] | 179 |             self.not_empty.release() | 
| Guido van Rossum | 9022fce | 1992-08-25 12:30:44 +0000 | [diff] [blame] | 180 |  | 
| Guido van Rossum | 9e1721f | 1999-02-08 18:34:01 +0000 | [diff] [blame] | 181 |     def get_nowait(self): | 
 | 182 |         """Remove and return an item from the queue without blocking. | 
| Guido van Rossum | 9022fce | 1992-08-25 12:30:44 +0000 | [diff] [blame] | 183 |  | 
| Martin v. Löwis | 77ac429 | 2002-10-15 15:11:13 +0000 | [diff] [blame] | 184 |         Only get an item if one is immediately available. Otherwise | 
| Guido van Rossum | 9e1721f | 1999-02-08 18:34:01 +0000 | [diff] [blame] | 185 |         raise the Empty exception. | 
 | 186 |         """ | 
| Tim Peters | 71ed220 | 2004-07-12 01:20:32 +0000 | [diff] [blame] | 187 |         return self.get(False) | 
| Guido van Rossum | 9022fce | 1992-08-25 12:30:44 +0000 | [diff] [blame] | 188 |  | 
| Barry Warsaw | 3d96d52 | 1997-11-20 19:56:38 +0000 | [diff] [blame] | 189 |     # Override these methods to implement other queue organizations | 
 | 190 |     # (e.g. stack or priority queue). | 
 | 191 |     # These will only be called with appropriate locks held | 
| Guido van Rossum | 9022fce | 1992-08-25 12:30:44 +0000 | [diff] [blame] | 192 |  | 
| Barry Warsaw | 3d96d52 | 1997-11-20 19:56:38 +0000 | [diff] [blame] | 193 |     # Initialize the queue representation | 
 | 194 |     def _init(self, maxsize): | 
| Guido van Rossum | 45e2fbc | 1998-03-26 21:13:24 +0000 | [diff] [blame] | 195 |         self.maxsize = maxsize | 
| Raymond Hettinger | 756b3f3 | 2004-01-29 06:37:52 +0000 | [diff] [blame] | 196 |         self.queue = deque() | 
| Guido van Rossum | 9022fce | 1992-08-25 12:30:44 +0000 | [diff] [blame] | 197 |  | 
| Barry Warsaw | 3d96d52 | 1997-11-20 19:56:38 +0000 | [diff] [blame] | 198 |     def _qsize(self): | 
| Guido van Rossum | 45e2fbc | 1998-03-26 21:13:24 +0000 | [diff] [blame] | 199 |         return len(self.queue) | 
| Guido van Rossum | 9022fce | 1992-08-25 12:30:44 +0000 | [diff] [blame] | 200 |  | 
| Jeremy Hylton | a05e293 | 2000-06-28 14:48:01 +0000 | [diff] [blame] | 201 |     # Check whether the queue is empty | 
| Barry Warsaw | 3d96d52 | 1997-11-20 19:56:38 +0000 | [diff] [blame] | 202 |     def _empty(self): | 
| Guido van Rossum | 45e2fbc | 1998-03-26 21:13:24 +0000 | [diff] [blame] | 203 |         return not self.queue | 
| Guido van Rossum | 9022fce | 1992-08-25 12:30:44 +0000 | [diff] [blame] | 204 |  | 
| Barry Warsaw | 3d96d52 | 1997-11-20 19:56:38 +0000 | [diff] [blame] | 205 |     # Check whether the queue is full | 
 | 206 |     def _full(self): | 
| Guido van Rossum | 45e2fbc | 1998-03-26 21:13:24 +0000 | [diff] [blame] | 207 |         return self.maxsize > 0 and len(self.queue) == self.maxsize | 
| Guido van Rossum | 9022fce | 1992-08-25 12:30:44 +0000 | [diff] [blame] | 208 |  | 
| Barry Warsaw | 3d96d52 | 1997-11-20 19:56:38 +0000 | [diff] [blame] | 209 |     # Put a new item in the queue | 
 | 210 |     def _put(self, item): | 
| Guido van Rossum | 45e2fbc | 1998-03-26 21:13:24 +0000 | [diff] [blame] | 211 |         self.queue.append(item) | 
| Guido van Rossum | 9022fce | 1992-08-25 12:30:44 +0000 | [diff] [blame] | 212 |  | 
| Barry Warsaw | 3d96d52 | 1997-11-20 19:56:38 +0000 | [diff] [blame] | 213 |     # Get an item from the queue | 
 | 214 |     def _get(self): | 
| Raymond Hettinger | 756b3f3 | 2004-01-29 06:37:52 +0000 | [diff] [blame] | 215 |         return self.queue.popleft() |