blob: 289ad1b014c356a7d2c860f3e07d5faf9eb982fa [file] [log] [blame]
Victor Stinner615a58e2015-02-25 13:55:43 +01001.. currentmodule:: asyncio
2
Yury Selivanov6c731642018-09-14 14:57:39 -07003.. _asyncio-queues:
Yury Selivanov7c7605f2018-09-11 09:54:40 -07004
5======
Victor Stinner615a58e2015-02-25 13:55:43 +01006Queues
7======
8
Kyle Stanleyf9000642019-10-10 19:18:46 -04009**Source code:** :source:`Lib/asyncio/queues.py`
10
11------------------------------------------------
12
Yury Selivanov7c7605f2018-09-11 09:54:40 -070013asyncio queues are designed to be similar to classes of the
14:mod:`queue` module. Although asyncio queues are not thread-safe,
15they are designed to be used specifically in async/await code.
lf627d2c82017-07-25 17:03:51 -060016
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -040017Note that methods of asyncio queues don't have a *timeout* parameter;
Yury Selivanov7c7605f2018-09-11 09:54:40 -070018use :func:`asyncio.wait_for` function to do queue operations with a
19timeout.
Victor Stinner615a58e2015-02-25 13:55:43 +010020
Yury Selivanov7c7605f2018-09-11 09:54:40 -070021See also the `Examples`_ section below.
Victor Stinner615a58e2015-02-25 13:55:43 +010022
23Queue
Yury Selivanov7c7605f2018-09-11 09:54:40 -070024=====
Victor Stinner615a58e2015-02-25 13:55:43 +010025
Yurii Karabas86150d32020-11-29 14:50:57 +020026.. class:: Queue(maxsize=0)
Victor Stinner615a58e2015-02-25 13:55:43 +010027
Yury Selivanov7c7605f2018-09-11 09:54:40 -070028 A first in, first out (FIFO) queue.
Victor Stinner615a58e2015-02-25 13:55:43 +010029
Yury Selivanov7c7605f2018-09-11 09:54:40 -070030 If *maxsize* is less than or equal to zero, the queue size is
31 infinite. If it is an integer greater than ``0``, then
32 ``await put()`` blocks when the queue reaches *maxsize*
33 until an item is removed by :meth:`get`.
Victor Stinner615a58e2015-02-25 13:55:43 +010034
Yury Selivanov7c7605f2018-09-11 09:54:40 -070035 Unlike the standard library threading :mod:`queue`, the size of
36 the queue is always known and can be returned by calling the
37 :meth:`qsize` method.
Victor Stinner615a58e2015-02-25 13:55:43 +010038
Emmanuel Arias9008be32019-09-10 08:46:12 -030039
Victor Stinner83704962015-02-25 14:24:15 +010040 This class is :ref:`not thread safe <asyncio-multithreading>`.
41
Yury Selivanov7c7605f2018-09-11 09:54:40 -070042 .. attribute:: maxsize
43
44 Number of items allowed in the queue.
Victor Stinner615a58e2015-02-25 13:55:43 +010045
46 .. method:: empty()
47
48 Return ``True`` if the queue is empty, ``False`` otherwise.
49
50 .. method:: full()
51
52 Return ``True`` if there are :attr:`maxsize` items in the queue.
53
Yury Selivanov7c7605f2018-09-11 09:54:40 -070054 If the queue was initialized with ``maxsize=0`` (the default),
55 then :meth:`full()` never returns ``True``.
Victor Stinner615a58e2015-02-25 13:55:43 +010056
57 .. coroutinemethod:: get()
58
Yury Selivanov7c7605f2018-09-11 09:54:40 -070059 Remove and return an item from the queue. If queue is empty,
60 wait until an item is available.
Victor Stinner615a58e2015-02-25 13:55:43 +010061
62 .. method:: get_nowait()
63
Victor Stinner615a58e2015-02-25 13:55:43 +010064 Return an item if one is immediately available, else raise
65 :exc:`QueueEmpty`.
66
67 .. coroutinemethod:: join()
68
Carol Willingc9d66f02018-09-14 10:06:55 -070069 Block until all items in the queue have been received and processed.
Victor Stinner615a58e2015-02-25 13:55:43 +010070
Yury Selivanov7c7605f2018-09-11 09:54:40 -070071 The count of unfinished tasks goes up whenever an item is added
Slam97e12992019-01-17 13:52:17 +020072 to the queue. The count goes down whenever a consumer coroutine calls
Yury Selivanov7c7605f2018-09-11 09:54:40 -070073 :meth:`task_done` to indicate that the item was retrieved and all
74 work on it is complete. When the count of unfinished tasks drops
75 to zero, :meth:`join` unblocks.
Victor Stinner615a58e2015-02-25 13:55:43 +010076
77 .. coroutinemethod:: put(item)
78
Yury Selivanov7c7605f2018-09-11 09:54:40 -070079 Put an item into the queue. If the queue is full, wait until a
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -040080 free slot is available before adding the item.
Victor Stinner615a58e2015-02-25 13:55:43 +010081
82 .. method:: put_nowait(item)
83
84 Put an item into the queue without blocking.
85
86 If no free slot is immediately available, raise :exc:`QueueFull`.
87
88 .. method:: qsize()
89
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -040090 Return the number of items in the queue.
Victor Stinner615a58e2015-02-25 13:55:43 +010091
92 .. method:: task_done()
93
94 Indicate that a formerly enqueued task is complete.
95
Yury Selivanov7c7605f2018-09-11 09:54:40 -070096 Used by queue consumers. For each :meth:`~Queue.get` used to
97 fetch a task, a subsequent call to :meth:`task_done` tells the
98 queue that the processing on the task is complete.
Victor Stinner615a58e2015-02-25 13:55:43 +010099
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700100 If a :meth:`join` is currently blocking, it will resume when all
101 items have been processed (meaning that a :meth:`task_done`
102 call was received for every item that had been :meth:`~Queue.put`
103 into the queue).
Victor Stinner615a58e2015-02-25 13:55:43 +0100104
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700105 Raises :exc:`ValueError` if called more times than there were
106 items placed in the queue.
Victor Stinner615a58e2015-02-25 13:55:43 +0100107
108
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700109Priority Queue
110==============
Victor Stinner615a58e2015-02-25 13:55:43 +0100111
112.. class:: PriorityQueue
113
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700114 A variant of :class:`Queue`; retrieves entries in priority order
115 (lowest first).
Victor Stinner615a58e2015-02-25 13:55:43 +0100116
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700117 Entries are typically tuples of the form
118 ``(priority_number, data)``.
Victor Stinner615a58e2015-02-25 13:55:43 +0100119
120
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700121LIFO Queue
122==========
Victor Stinner615a58e2015-02-25 13:55:43 +0100123
124.. class:: LifoQueue
125
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700126 A variant of :class:`Queue` that retrieves most recently added
127 entries first (last in, first out).
Victor Stinner615a58e2015-02-25 13:55:43 +0100128
129
Victor Stinner615a58e2015-02-25 13:55:43 +0100130Exceptions
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700131==========
Victor Stinner615a58e2015-02-25 13:55:43 +0100132
133.. exception:: QueueEmpty
134
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700135 This exception is raised when the :meth:`~Queue.get_nowait` method
136 is called on an empty queue.
Victor Stinner615a58e2015-02-25 13:55:43 +0100137
138
139.. exception:: QueueFull
140
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700141 Exception raised when the :meth:`~Queue.put_nowait` method is called
142 on a queue that has reached its *maxsize*.
143
144
145Examples
146========
147
Yury Selivanov7372c3b2018-09-14 15:11:24 -0700148.. _asyncio_example_queue_dist:
149
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700150Queues can be used to distribute workload between several
151concurrent tasks::
152
153 import asyncio
154 import random
155 import time
156
157
158 async def worker(name, queue):
159 while True:
160 # Get a "work item" out of the queue.
161 sleep_for = await queue.get()
162
163 # Sleep for the "sleep_for" seconds.
164 await asyncio.sleep(sleep_for)
165
166 # Notify the queue that the "work item" has been processed.
167 queue.task_done()
168
169 print(f'{name} has slept for {sleep_for:.2f} seconds')
170
171
172 async def main():
173 # Create a queue that we will use to store our "workload".
174 queue = asyncio.Queue()
175
176 # Generate random timings and put them into the queue.
177 total_sleep_time = 0
178 for _ in range(20):
179 sleep_for = random.uniform(0.05, 1.0)
180 total_sleep_time += sleep_for
181 queue.put_nowait(sleep_for)
182
183 # Create three worker tasks to process the queue concurrently.
184 tasks = []
185 for i in range(3):
186 task = asyncio.create_task(worker(f'worker-{i}', queue))
187 tasks.append(task)
188
189 # Wait until the queue is fully processed.
190 started_at = time.monotonic()
191 await queue.join()
192 total_slept_for = time.monotonic() - started_at
193
194 # Cancel our worker tasks.
195 for task in tasks:
196 task.cancel()
197 # Wait until all worker tasks are cancelled.
198 await asyncio.gather(*tasks, return_exceptions=True)
199
200 print('====')
201 print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
202 print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
203
204
205 asyncio.run(main())