blob: e7775da54f589138407aec5114c565c471cf084c [file] [log] [blame]
Victor Stinner615a58e2015-02-25 13:55:43 +01001.. currentmodule:: asyncio
2
Yury Selivanov6c731642018-09-14 14:57:39 -07003.. _asyncio-queues:
Yury Selivanov7c7605f2018-09-11 09:54:40 -07004
5======
Victor Stinner615a58e2015-02-25 13:55:43 +01006Queues
7======
8
Yury Selivanov7c7605f2018-09-11 09:54:40 -07009asyncio queues are designed to be similar to classes of the
10:mod:`queue` module. Although asyncio queues are not thread-safe,
11they are designed to be used specifically in async/await code.
lf627d2c82017-07-25 17:03:51 -060012
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -040013Note that methods of asyncio queues don't have a *timeout* parameter;
Yury Selivanov7c7605f2018-09-11 09:54:40 -070014use :func:`asyncio.wait_for` function to do queue operations with a
15timeout.
Victor Stinner615a58e2015-02-25 13:55:43 +010016
Yury Selivanov7c7605f2018-09-11 09:54:40 -070017See also the `Examples`_ section below.
Victor Stinner615a58e2015-02-25 13:55:43 +010018
19Queue
Yury Selivanov7c7605f2018-09-11 09:54:40 -070020=====
Victor Stinner615a58e2015-02-25 13:55:43 +010021
22.. class:: Queue(maxsize=0, \*, loop=None)
23
Yury Selivanov7c7605f2018-09-11 09:54:40 -070024 A first in, first out (FIFO) queue.
Victor Stinner615a58e2015-02-25 13:55:43 +010025
Yury Selivanov7c7605f2018-09-11 09:54:40 -070026 If *maxsize* is less than or equal to zero, the queue size is
27 infinite. If it is an integer greater than ``0``, then
28 ``await put()`` blocks when the queue reaches *maxsize*
29 until an item is removed by :meth:`get`.
Victor Stinner615a58e2015-02-25 13:55:43 +010030
Yury Selivanov7c7605f2018-09-11 09:54:40 -070031 Unlike the standard library threading :mod:`queue`, the size of
32 the queue is always known and can be returned by calling the
33 :meth:`qsize` method.
Victor Stinner615a58e2015-02-25 13:55:43 +010034
Emmanuel Arias9008be32019-09-10 08:46:12 -030035 .. deprecated-removed:: 3.8 3.10
36 The *loop* parameter.
37
38
Victor Stinner83704962015-02-25 14:24:15 +010039 This class is :ref:`not thread safe <asyncio-multithreading>`.
40
Yury Selivanov7c7605f2018-09-11 09:54:40 -070041 .. attribute:: maxsize
42
43 Number of items allowed in the queue.
Victor Stinner615a58e2015-02-25 13:55:43 +010044
45 .. method:: empty()
46
47 Return ``True`` if the queue is empty, ``False`` otherwise.
48
49 .. method:: full()
50
51 Return ``True`` if there are :attr:`maxsize` items in the queue.
52
Yury Selivanov7c7605f2018-09-11 09:54:40 -070053 If the queue was initialized with ``maxsize=0`` (the default),
54 then :meth:`full()` never returns ``True``.
Victor Stinner615a58e2015-02-25 13:55:43 +010055
56 .. coroutinemethod:: get()
57
Yury Selivanov7c7605f2018-09-11 09:54:40 -070058 Remove and return an item from the queue. If queue is empty,
59 wait until an item is available.
Victor Stinner615a58e2015-02-25 13:55:43 +010060
61 .. method:: get_nowait()
62
Victor Stinner615a58e2015-02-25 13:55:43 +010063 Return an item if one is immediately available, else raise
64 :exc:`QueueEmpty`.
65
66 .. coroutinemethod:: join()
67
Carol Willingc9d66f02018-09-14 10:06:55 -070068 Block until all items in the queue have been received and processed.
Victor Stinner615a58e2015-02-25 13:55:43 +010069
Yury Selivanov7c7605f2018-09-11 09:54:40 -070070 The count of unfinished tasks goes up whenever an item is added
Slam97e12992019-01-17 13:52:17 +020071 to the queue. The count goes down whenever a consumer coroutine calls
Yury Selivanov7c7605f2018-09-11 09:54:40 -070072 :meth:`task_done` to indicate that the item was retrieved and all
73 work on it is complete. When the count of unfinished tasks drops
74 to zero, :meth:`join` unblocks.
Victor Stinner615a58e2015-02-25 13:55:43 +010075
76 .. coroutinemethod:: put(item)
77
Yury Selivanov7c7605f2018-09-11 09:54:40 -070078 Put an item into the queue. If the queue is full, wait until a
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -040079 free slot is available before adding the item.
Victor Stinner615a58e2015-02-25 13:55:43 +010080
81 .. method:: put_nowait(item)
82
83 Put an item into the queue without blocking.
84
85 If no free slot is immediately available, raise :exc:`QueueFull`.
86
87 .. method:: qsize()
88
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -040089 Return the number of items in the queue.
Victor Stinner615a58e2015-02-25 13:55:43 +010090
91 .. method:: task_done()
92
93 Indicate that a formerly enqueued task is complete.
94
Yury Selivanov7c7605f2018-09-11 09:54:40 -070095 Used by queue consumers. For each :meth:`~Queue.get` used to
96 fetch a task, a subsequent call to :meth:`task_done` tells the
97 queue that the processing on the task is complete.
Victor Stinner615a58e2015-02-25 13:55:43 +010098
Yury Selivanov7c7605f2018-09-11 09:54:40 -070099 If a :meth:`join` is currently blocking, it will resume when all
100 items have been processed (meaning that a :meth:`task_done`
101 call was received for every item that had been :meth:`~Queue.put`
102 into the queue).
Victor Stinner615a58e2015-02-25 13:55:43 +0100103
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700104 Raises :exc:`ValueError` if called more times than there were
105 items placed in the queue.
Victor Stinner615a58e2015-02-25 13:55:43 +0100106
107
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700108Priority Queue
109==============
Victor Stinner615a58e2015-02-25 13:55:43 +0100110
111.. class:: PriorityQueue
112
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700113 A variant of :class:`Queue`; retrieves entries in priority order
114 (lowest first).
Victor Stinner615a58e2015-02-25 13:55:43 +0100115
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700116 Entries are typically tuples of the form
117 ``(priority_number, data)``.
Victor Stinner615a58e2015-02-25 13:55:43 +0100118
119
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700120LIFO Queue
121==========
Victor Stinner615a58e2015-02-25 13:55:43 +0100122
123.. class:: LifoQueue
124
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700125 A variant of :class:`Queue` that retrieves most recently added
126 entries first (last in, first out).
Victor Stinner615a58e2015-02-25 13:55:43 +0100127
128
Victor Stinner615a58e2015-02-25 13:55:43 +0100129Exceptions
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700130==========
Victor Stinner615a58e2015-02-25 13:55:43 +0100131
132.. exception:: QueueEmpty
133
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700134 This exception is raised when the :meth:`~Queue.get_nowait` method
135 is called on an empty queue.
Victor Stinner615a58e2015-02-25 13:55:43 +0100136
137
138.. exception:: QueueFull
139
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700140 Exception raised when the :meth:`~Queue.put_nowait` method is called
141 on a queue that has reached its *maxsize*.
142
143
144Examples
145========
146
Yury Selivanov7372c3b2018-09-14 15:11:24 -0700147.. _asyncio_example_queue_dist:
148
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700149Queues can be used to distribute workload between several
150concurrent tasks::
151
152 import asyncio
153 import random
154 import time
155
156
157 async def worker(name, queue):
158 while True:
159 # Get a "work item" out of the queue.
160 sleep_for = await queue.get()
161
162 # Sleep for the "sleep_for" seconds.
163 await asyncio.sleep(sleep_for)
164
165 # Notify the queue that the "work item" has been processed.
166 queue.task_done()
167
168 print(f'{name} has slept for {sleep_for:.2f} seconds')
169
170
171 async def main():
172 # Create a queue that we will use to store our "workload".
173 queue = asyncio.Queue()
174
175 # Generate random timings and put them into the queue.
176 total_sleep_time = 0
177 for _ in range(20):
178 sleep_for = random.uniform(0.05, 1.0)
179 total_sleep_time += sleep_for
180 queue.put_nowait(sleep_for)
181
182 # Create three worker tasks to process the queue concurrently.
183 tasks = []
184 for i in range(3):
185 task = asyncio.create_task(worker(f'worker-{i}', queue))
186 tasks.append(task)
187
188 # Wait until the queue is fully processed.
189 started_at = time.monotonic()
190 await queue.join()
191 total_slept_for = time.monotonic() - started_at
192
193 # Cancel our worker tasks.
194 for task in tasks:
195 task.cancel()
196 # Wait until all worker tasks are cancelled.
197 await asyncio.gather(*tasks, return_exceptions=True)
198
199 print('====')
200 print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
201 print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
202
203
204 asyncio.run(main())