blob: a910dc73ab26df9d42791fad0bd33cb5915d45ca [file] [log] [blame]
Victor Stinner615a58e2015-02-25 13:55:43 +01001.. currentmodule:: asyncio
2
Yury Selivanov7c7605f2018-09-11 09:54:40 -07003
4======
Victor Stinner615a58e2015-02-25 13:55:43 +01005Queues
6======
7
Yury Selivanov7c7605f2018-09-11 09:54:40 -07008asyncio queues are designed to be similar to classes of the
9:mod:`queue` module. Although asyncio queues are not thread-safe,
10they are designed to be used specifically in async/await code.
lf627d2c82017-07-25 17:03:51 -060011
Yury Selivanov7c7605f2018-09-11 09:54:40 -070012Note that methods on asyncio queues don't have a *timeout* parameter;
13use :func:`asyncio.wait_for` function to do queue operations with a
14timeout.
Victor Stinner615a58e2015-02-25 13:55:43 +010015
Yury Selivanov7c7605f2018-09-11 09:54:40 -070016See also the `Examples`_ section below.
Victor Stinner615a58e2015-02-25 13:55:43 +010017
18Queue
Yury Selivanov7c7605f2018-09-11 09:54:40 -070019=====
Victor Stinner615a58e2015-02-25 13:55:43 +010020
21.. class:: Queue(maxsize=0, \*, loop=None)
22
Yury Selivanov7c7605f2018-09-11 09:54:40 -070023 A first in, first out (FIFO) queue.
Victor Stinner615a58e2015-02-25 13:55:43 +010024
Yury Selivanov7c7605f2018-09-11 09:54:40 -070025 If *maxsize* is less than or equal to zero, the queue size is
26 infinite. If it is an integer greater than ``0``, then
27 ``await put()`` blocks when the queue reaches *maxsize*
28 until an item is removed by :meth:`get`.
Victor Stinner615a58e2015-02-25 13:55:43 +010029
Yury Selivanov7c7605f2018-09-11 09:54:40 -070030 Unlike the standard library threading :mod:`queue`, the size of
31 the queue is always known and can be returned by calling the
32 :meth:`qsize` method.
Victor Stinner615a58e2015-02-25 13:55:43 +010033
Victor Stinner83704962015-02-25 14:24:15 +010034 This class is :ref:`not thread safe <asyncio-multithreading>`.
35
Yury Selivanov7c7605f2018-09-11 09:54:40 -070036 .. attribute:: maxsize
37
38 Number of items allowed in the queue.
Victor Stinner615a58e2015-02-25 13:55:43 +010039
40 .. method:: empty()
41
42 Return ``True`` if the queue is empty, ``False`` otherwise.
43
44 .. method:: full()
45
46 Return ``True`` if there are :attr:`maxsize` items in the queue.
47
Yury Selivanov7c7605f2018-09-11 09:54:40 -070048 If the queue was initialized with ``maxsize=0`` (the default),
49 then :meth:`full()` never returns ``True``.
Victor Stinner615a58e2015-02-25 13:55:43 +010050
51 .. coroutinemethod:: get()
52
Yury Selivanov7c7605f2018-09-11 09:54:40 -070053 Remove and return an item from the queue. If queue is empty,
54 wait until an item is available.
Victor Stinner615a58e2015-02-25 13:55:43 +010055
56 .. method:: get_nowait()
57
Victor Stinner615a58e2015-02-25 13:55:43 +010058 Return an item if one is immediately available, else raise
59 :exc:`QueueEmpty`.
60
61 .. coroutinemethod:: join()
62
Carol Willingc9d66f02018-09-14 10:06:55 -070063 Block until all items in the queue have been received and processed.
Victor Stinner615a58e2015-02-25 13:55:43 +010064
Yury Selivanov7c7605f2018-09-11 09:54:40 -070065 The count of unfinished tasks goes up whenever an item is added
66 to the queue. The count goes down whenever a consumer thread calls
67 :meth:`task_done` to indicate that the item was retrieved and all
68 work on it is complete. When the count of unfinished tasks drops
69 to zero, :meth:`join` unblocks.
Victor Stinner615a58e2015-02-25 13:55:43 +010070
71 .. coroutinemethod:: put(item)
72
Yury Selivanov7c7605f2018-09-11 09:54:40 -070073 Put an item into the queue. If the queue is full, wait until a
74 free slot is available before adding item.
Victor Stinner615a58e2015-02-25 13:55:43 +010075
76 .. method:: put_nowait(item)
77
78 Put an item into the queue without blocking.
79
80 If no free slot is immediately available, raise :exc:`QueueFull`.
81
82 .. method:: qsize()
83
84 Number of items in the queue.
85
86 .. method:: task_done()
87
88 Indicate that a formerly enqueued task is complete.
89
Yury Selivanov7c7605f2018-09-11 09:54:40 -070090 Used by queue consumers. For each :meth:`~Queue.get` used to
91 fetch a task, a subsequent call to :meth:`task_done` tells the
92 queue that the processing on the task is complete.
Victor Stinner615a58e2015-02-25 13:55:43 +010093
Yury Selivanov7c7605f2018-09-11 09:54:40 -070094 If a :meth:`join` is currently blocking, it will resume when all
95 items have been processed (meaning that a :meth:`task_done`
96 call was received for every item that had been :meth:`~Queue.put`
97 into the queue).
Victor Stinner615a58e2015-02-25 13:55:43 +010098
Yury Selivanov7c7605f2018-09-11 09:54:40 -070099 Raises :exc:`ValueError` if called more times than there were
100 items placed in the queue.
Victor Stinner615a58e2015-02-25 13:55:43 +0100101
102
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700103Priority Queue
104==============
Victor Stinner615a58e2015-02-25 13:55:43 +0100105
106.. class:: PriorityQueue
107
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700108 A variant of :class:`Queue`; retrieves entries in priority order
109 (lowest first).
Victor Stinner615a58e2015-02-25 13:55:43 +0100110
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700111 Entries are typically tuples of the form
112 ``(priority_number, data)``.
Victor Stinner615a58e2015-02-25 13:55:43 +0100113
114
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700115LIFO Queue
116==========
Victor Stinner615a58e2015-02-25 13:55:43 +0100117
118.. class:: LifoQueue
119
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700120 A variant of :class:`Queue` that retrieves most recently added
121 entries first (last in, first out).
Victor Stinner615a58e2015-02-25 13:55:43 +0100122
123
Victor Stinner615a58e2015-02-25 13:55:43 +0100124Exceptions
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700125==========
Victor Stinner615a58e2015-02-25 13:55:43 +0100126
127.. exception:: QueueEmpty
128
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700129 This exception is raised when the :meth:`~Queue.get_nowait` method
130 is called on an empty queue.
Victor Stinner615a58e2015-02-25 13:55:43 +0100131
132
133.. exception:: QueueFull
134
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700135 Exception raised when the :meth:`~Queue.put_nowait` method is called
136 on a queue that has reached its *maxsize*.
137
138
139Examples
140========
141
142Queues can be used to distribute workload between several
143concurrent tasks::
144
145 import asyncio
146 import random
147 import time
148
149
150 async def worker(name, queue):
151 while True:
152 # Get a "work item" out of the queue.
153 sleep_for = await queue.get()
154
155 # Sleep for the "sleep_for" seconds.
156 await asyncio.sleep(sleep_for)
157
158 # Notify the queue that the "work item" has been processed.
159 queue.task_done()
160
161 print(f'{name} has slept for {sleep_for:.2f} seconds')
162
163
164 async def main():
165 # Create a queue that we will use to store our "workload".
166 queue = asyncio.Queue()
167
168 # Generate random timings and put them into the queue.
169 total_sleep_time = 0
170 for _ in range(20):
171 sleep_for = random.uniform(0.05, 1.0)
172 total_sleep_time += sleep_for
173 queue.put_nowait(sleep_for)
174
175 # Create three worker tasks to process the queue concurrently.
176 tasks = []
177 for i in range(3):
178 task = asyncio.create_task(worker(f'worker-{i}', queue))
179 tasks.append(task)
180
181 # Wait until the queue is fully processed.
182 started_at = time.monotonic()
183 await queue.join()
184 total_slept_for = time.monotonic() - started_at
185
186 # Cancel our worker tasks.
187 for task in tasks:
188 task.cancel()
189 # Wait until all worker tasks are cancelled.
190 await asyncio.gather(*tasks, return_exceptions=True)
191
192 print('====')
193 print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
194 print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
195
196
197 asyncio.run(main())