blob: 524560b691d720d6bf39a6aab2517e0a19c57d3d [file] [log] [blame]
Victor Stinner615a58e2015-02-25 13:55:43 +01001.. currentmodule:: asyncio
2
Yury Selivanov6c731642018-09-14 14:57:39 -07003.. _asyncio-queues:
Yury Selivanov7c7605f2018-09-11 09:54:40 -07004
5======
Victor Stinner615a58e2015-02-25 13:55:43 +01006Queues
7======
8
Kyle Stanleyf9000642019-10-10 19:18:46 -04009**Source code:** :source:`Lib/asyncio/queues.py`
10
11------------------------------------------------
12
Yury Selivanov7c7605f2018-09-11 09:54:40 -070013asyncio queues are designed to be similar to classes of the
14:mod:`queue` module. Although asyncio queues are not thread-safe,
15they are designed to be used specifically in async/await code.
lf627d2c82017-07-25 17:03:51 -060016
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -040017Note that methods of asyncio queues don't have a *timeout* parameter;
Yury Selivanov7c7605f2018-09-11 09:54:40 -070018use :func:`asyncio.wait_for` function to do queue operations with a
19timeout.
Victor Stinner615a58e2015-02-25 13:55:43 +010020
Yury Selivanov7c7605f2018-09-11 09:54:40 -070021See also the `Examples`_ section below.
Victor Stinner615a58e2015-02-25 13:55:43 +010022
23Queue
Yury Selivanov7c7605f2018-09-11 09:54:40 -070024=====
Victor Stinner615a58e2015-02-25 13:55:43 +010025
26.. class:: Queue(maxsize=0, \*, loop=None)
27
Yury Selivanov7c7605f2018-09-11 09:54:40 -070028 A first in, first out (FIFO) queue.
Victor Stinner615a58e2015-02-25 13:55:43 +010029
Yury Selivanov7c7605f2018-09-11 09:54:40 -070030 If *maxsize* is less than or equal to zero, the queue size is
31 infinite. If it is an integer greater than ``0``, then
32 ``await put()`` blocks when the queue reaches *maxsize*
33 until an item is removed by :meth:`get`.
Victor Stinner615a58e2015-02-25 13:55:43 +010034
Yury Selivanov7c7605f2018-09-11 09:54:40 -070035 Unlike the standard library threading :mod:`queue`, the size of
36 the queue is always known and can be returned by calling the
37 :meth:`qsize` method.
Victor Stinner615a58e2015-02-25 13:55:43 +010038
Emmanuel Arias9008be32019-09-10 08:46:12 -030039 .. deprecated-removed:: 3.8 3.10
40 The *loop* parameter.
41
42
Victor Stinner83704962015-02-25 14:24:15 +010043 This class is :ref:`not thread safe <asyncio-multithreading>`.
44
Yury Selivanov7c7605f2018-09-11 09:54:40 -070045 .. attribute:: maxsize
46
47 Number of items allowed in the queue.
Victor Stinner615a58e2015-02-25 13:55:43 +010048
49 .. method:: empty()
50
51 Return ``True`` if the queue is empty, ``False`` otherwise.
52
53 .. method:: full()
54
55 Return ``True`` if there are :attr:`maxsize` items in the queue.
56
Yury Selivanov7c7605f2018-09-11 09:54:40 -070057 If the queue was initialized with ``maxsize=0`` (the default),
58 then :meth:`full()` never returns ``True``.
Victor Stinner615a58e2015-02-25 13:55:43 +010059
60 .. coroutinemethod:: get()
61
Yury Selivanov7c7605f2018-09-11 09:54:40 -070062 Remove and return an item from the queue. If queue is empty,
63 wait until an item is available.
Victor Stinner615a58e2015-02-25 13:55:43 +010064
65 .. method:: get_nowait()
66
Victor Stinner615a58e2015-02-25 13:55:43 +010067 Return an item if one is immediately available, else raise
68 :exc:`QueueEmpty`.
69
70 .. coroutinemethod:: join()
71
Carol Willingc9d66f02018-09-14 10:06:55 -070072 Block until all items in the queue have been received and processed.
Victor Stinner615a58e2015-02-25 13:55:43 +010073
Yury Selivanov7c7605f2018-09-11 09:54:40 -070074 The count of unfinished tasks goes up whenever an item is added
Slam97e12992019-01-17 13:52:17 +020075 to the queue. The count goes down whenever a consumer coroutine calls
Yury Selivanov7c7605f2018-09-11 09:54:40 -070076 :meth:`task_done` to indicate that the item was retrieved and all
77 work on it is complete. When the count of unfinished tasks drops
78 to zero, :meth:`join` unblocks.
Victor Stinner615a58e2015-02-25 13:55:43 +010079
80 .. coroutinemethod:: put(item)
81
Yury Selivanov7c7605f2018-09-11 09:54:40 -070082 Put an item into the queue. If the queue is full, wait until a
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -040083 free slot is available before adding the item.
Victor Stinner615a58e2015-02-25 13:55:43 +010084
85 .. method:: put_nowait(item)
86
87 Put an item into the queue without blocking.
88
89 If no free slot is immediately available, raise :exc:`QueueFull`.
90
91 .. method:: qsize()
92
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -040093 Return the number of items in the queue.
Victor Stinner615a58e2015-02-25 13:55:43 +010094
95 .. method:: task_done()
96
97 Indicate that a formerly enqueued task is complete.
98
Yury Selivanov7c7605f2018-09-11 09:54:40 -070099 Used by queue consumers. For each :meth:`~Queue.get` used to
100 fetch a task, a subsequent call to :meth:`task_done` tells the
101 queue that the processing on the task is complete.
Victor Stinner615a58e2015-02-25 13:55:43 +0100102
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700103 If a :meth:`join` is currently blocking, it will resume when all
104 items have been processed (meaning that a :meth:`task_done`
105 call was received for every item that had been :meth:`~Queue.put`
106 into the queue).
Victor Stinner615a58e2015-02-25 13:55:43 +0100107
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700108 Raises :exc:`ValueError` if called more times than there were
109 items placed in the queue.
Victor Stinner615a58e2015-02-25 13:55:43 +0100110
111
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700112Priority Queue
113==============
Victor Stinner615a58e2015-02-25 13:55:43 +0100114
115.. class:: PriorityQueue
116
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700117 A variant of :class:`Queue`; retrieves entries in priority order
118 (lowest first).
Victor Stinner615a58e2015-02-25 13:55:43 +0100119
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700120 Entries are typically tuples of the form
121 ``(priority_number, data)``.
Victor Stinner615a58e2015-02-25 13:55:43 +0100122
123
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700124LIFO Queue
125==========
Victor Stinner615a58e2015-02-25 13:55:43 +0100126
127.. class:: LifoQueue
128
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700129 A variant of :class:`Queue` that retrieves most recently added
130 entries first (last in, first out).
Victor Stinner615a58e2015-02-25 13:55:43 +0100131
132
Victor Stinner615a58e2015-02-25 13:55:43 +0100133Exceptions
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700134==========
Victor Stinner615a58e2015-02-25 13:55:43 +0100135
136.. exception:: QueueEmpty
137
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700138 This exception is raised when the :meth:`~Queue.get_nowait` method
139 is called on an empty queue.
Victor Stinner615a58e2015-02-25 13:55:43 +0100140
141
142.. exception:: QueueFull
143
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700144 Exception raised when the :meth:`~Queue.put_nowait` method is called
145 on a queue that has reached its *maxsize*.
146
147
148Examples
149========
150
Yury Selivanov7372c3b2018-09-14 15:11:24 -0700151.. _asyncio_example_queue_dist:
152
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700153Queues can be used to distribute workload between several
154concurrent tasks::
155
156 import asyncio
157 import random
158 import time
159
160
161 async def worker(name, queue):
162 while True:
163 # Get a "work item" out of the queue.
164 sleep_for = await queue.get()
165
166 # Sleep for the "sleep_for" seconds.
167 await asyncio.sleep(sleep_for)
168
169 # Notify the queue that the "work item" has been processed.
170 queue.task_done()
171
172 print(f'{name} has slept for {sleep_for:.2f} seconds')
173
174
175 async def main():
176 # Create a queue that we will use to store our "workload".
177 queue = asyncio.Queue()
178
179 # Generate random timings and put them into the queue.
180 total_sleep_time = 0
181 for _ in range(20):
182 sleep_for = random.uniform(0.05, 1.0)
183 total_sleep_time += sleep_for
184 queue.put_nowait(sleep_for)
185
186 # Create three worker tasks to process the queue concurrently.
187 tasks = []
188 for i in range(3):
189 task = asyncio.create_task(worker(f'worker-{i}', queue))
190 tasks.append(task)
191
192 # Wait until the queue is fully processed.
193 started_at = time.monotonic()
194 await queue.join()
195 total_slept_for = time.monotonic() - started_at
196
197 # Cancel our worker tasks.
198 for task in tasks:
199 task.cancel()
200 # Wait until all worker tasks are cancelled.
201 await asyncio.gather(*tasks, return_exceptions=True)
202
203 print('====')
204 print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
205 print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
206
207
208 asyncio.run(main())