blob: 7be1023c80cc66ceb4126b6753cf128061725ad5 [file] [log] [blame]
Victor Stinner615a58e2015-02-25 13:55:43 +01001.. currentmodule:: asyncio
2
Yury Selivanov6c731642018-09-14 14:57:39 -07003.. _asyncio-queues:
Yury Selivanov7c7605f2018-09-11 09:54:40 -07004
5======
Victor Stinner615a58e2015-02-25 13:55:43 +01006Queues
7======
8
Yury Selivanov7c7605f2018-09-11 09:54:40 -07009asyncio queues are designed to be similar to classes of the
10:mod:`queue` module. Although asyncio queues are not thread-safe,
11they are designed to be used specifically in async/await code.
lf627d2c82017-07-25 17:03:51 -060012
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -040013Note that methods of asyncio queues don't have a *timeout* parameter;
Yury Selivanov7c7605f2018-09-11 09:54:40 -070014use :func:`asyncio.wait_for` function to do queue operations with a
15timeout.
Victor Stinner615a58e2015-02-25 13:55:43 +010016
Yury Selivanov7c7605f2018-09-11 09:54:40 -070017See also the `Examples`_ section below.
Victor Stinner615a58e2015-02-25 13:55:43 +010018
19Queue
Yury Selivanov7c7605f2018-09-11 09:54:40 -070020=====
Victor Stinner615a58e2015-02-25 13:55:43 +010021
22.. class:: Queue(maxsize=0, \*, loop=None)
23
Yury Selivanov7c7605f2018-09-11 09:54:40 -070024 A first in, first out (FIFO) queue.
Victor Stinner615a58e2015-02-25 13:55:43 +010025
Yury Selivanov7c7605f2018-09-11 09:54:40 -070026 If *maxsize* is less than or equal to zero, the queue size is
27 infinite. If it is an integer greater than ``0``, then
28 ``await put()`` blocks when the queue reaches *maxsize*
29 until an item is removed by :meth:`get`.
Victor Stinner615a58e2015-02-25 13:55:43 +010030
Yury Selivanov7c7605f2018-09-11 09:54:40 -070031 Unlike the standard library threading :mod:`queue`, the size of
32 the queue is always known and can be returned by calling the
33 :meth:`qsize` method.
Victor Stinner615a58e2015-02-25 13:55:43 +010034
Victor Stinner83704962015-02-25 14:24:15 +010035 This class is :ref:`not thread safe <asyncio-multithreading>`.
36
Yury Selivanov7c7605f2018-09-11 09:54:40 -070037 .. attribute:: maxsize
38
39 Number of items allowed in the queue.
Victor Stinner615a58e2015-02-25 13:55:43 +010040
41 .. method:: empty()
42
43 Return ``True`` if the queue is empty, ``False`` otherwise.
44
45 .. method:: full()
46
47 Return ``True`` if there are :attr:`maxsize` items in the queue.
48
Yury Selivanov7c7605f2018-09-11 09:54:40 -070049 If the queue was initialized with ``maxsize=0`` (the default),
50 then :meth:`full()` never returns ``True``.
Victor Stinner615a58e2015-02-25 13:55:43 +010051
52 .. coroutinemethod:: get()
53
Yury Selivanov7c7605f2018-09-11 09:54:40 -070054 Remove and return an item from the queue. If queue is empty,
55 wait until an item is available.
Victor Stinner615a58e2015-02-25 13:55:43 +010056
57 .. method:: get_nowait()
58
Victor Stinner615a58e2015-02-25 13:55:43 +010059 Return an item if one is immediately available, else raise
60 :exc:`QueueEmpty`.
61
62 .. coroutinemethod:: join()
63
Carol Willingc9d66f02018-09-14 10:06:55 -070064 Block until all items in the queue have been received and processed.
Victor Stinner615a58e2015-02-25 13:55:43 +010065
Yury Selivanov7c7605f2018-09-11 09:54:40 -070066 The count of unfinished tasks goes up whenever an item is added
Slam97e12992019-01-17 13:52:17 +020067 to the queue. The count goes down whenever a consumer coroutine calls
Yury Selivanov7c7605f2018-09-11 09:54:40 -070068 :meth:`task_done` to indicate that the item was retrieved and all
69 work on it is complete. When the count of unfinished tasks drops
70 to zero, :meth:`join` unblocks.
Victor Stinner615a58e2015-02-25 13:55:43 +010071
72 .. coroutinemethod:: put(item)
73
Yury Selivanov7c7605f2018-09-11 09:54:40 -070074 Put an item into the queue. If the queue is full, wait until a
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -040075 free slot is available before adding the item.
Victor Stinner615a58e2015-02-25 13:55:43 +010076
77 .. method:: put_nowait(item)
78
79 Put an item into the queue without blocking.
80
81 If no free slot is immediately available, raise :exc:`QueueFull`.
82
83 .. method:: qsize()
84
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -040085 Return the number of items in the queue.
Victor Stinner615a58e2015-02-25 13:55:43 +010086
87 .. method:: task_done()
88
89 Indicate that a formerly enqueued task is complete.
90
Yury Selivanov7c7605f2018-09-11 09:54:40 -070091 Used by queue consumers. For each :meth:`~Queue.get` used to
92 fetch a task, a subsequent call to :meth:`task_done` tells the
93 queue that the processing on the task is complete.
Victor Stinner615a58e2015-02-25 13:55:43 +010094
Yury Selivanov7c7605f2018-09-11 09:54:40 -070095 If a :meth:`join` is currently blocking, it will resume when all
96 items have been processed (meaning that a :meth:`task_done`
97 call was received for every item that had been :meth:`~Queue.put`
98 into the queue).
Victor Stinner615a58e2015-02-25 13:55:43 +010099
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700100 Raises :exc:`ValueError` if called more times than there were
101 items placed in the queue.
Victor Stinner615a58e2015-02-25 13:55:43 +0100102
103
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700104Priority Queue
105==============
Victor Stinner615a58e2015-02-25 13:55:43 +0100106
107.. class:: PriorityQueue
108
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700109 A variant of :class:`Queue`; retrieves entries in priority order
110 (lowest first).
Victor Stinner615a58e2015-02-25 13:55:43 +0100111
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700112 Entries are typically tuples of the form
113 ``(priority_number, data)``.
Victor Stinner615a58e2015-02-25 13:55:43 +0100114
115
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700116LIFO Queue
117==========
Victor Stinner615a58e2015-02-25 13:55:43 +0100118
119.. class:: LifoQueue
120
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700121 A variant of :class:`Queue` that retrieves most recently added
122 entries first (last in, first out).
Victor Stinner615a58e2015-02-25 13:55:43 +0100123
124
Victor Stinner615a58e2015-02-25 13:55:43 +0100125Exceptions
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700126==========
Victor Stinner615a58e2015-02-25 13:55:43 +0100127
128.. exception:: QueueEmpty
129
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700130 This exception is raised when the :meth:`~Queue.get_nowait` method
131 is called on an empty queue.
Victor Stinner615a58e2015-02-25 13:55:43 +0100132
133
134.. exception:: QueueFull
135
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700136 Exception raised when the :meth:`~Queue.put_nowait` method is called
137 on a queue that has reached its *maxsize*.
138
139
140Examples
141========
142
Yury Selivanov7372c3b2018-09-14 15:11:24 -0700143.. _asyncio_example_queue_dist:
144
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700145Queues can be used to distribute workload between several
146concurrent tasks::
147
148 import asyncio
149 import random
150 import time
151
152
153 async def worker(name, queue):
154 while True:
155 # Get a "work item" out of the queue.
156 sleep_for = await queue.get()
157
158 # Sleep for the "sleep_for" seconds.
159 await asyncio.sleep(sleep_for)
160
161 # Notify the queue that the "work item" has been processed.
162 queue.task_done()
163
164 print(f'{name} has slept for {sleep_for:.2f} seconds')
165
166
167 async def main():
168 # Create a queue that we will use to store our "workload".
169 queue = asyncio.Queue()
170
171 # Generate random timings and put them into the queue.
172 total_sleep_time = 0
173 for _ in range(20):
174 sleep_for = random.uniform(0.05, 1.0)
175 total_sleep_time += sleep_for
176 queue.put_nowait(sleep_for)
177
178 # Create three worker tasks to process the queue concurrently.
179 tasks = []
180 for i in range(3):
181 task = asyncio.create_task(worker(f'worker-{i}', queue))
182 tasks.append(task)
183
184 # Wait until the queue is fully processed.
185 started_at = time.monotonic()
186 await queue.join()
187 total_slept_for = time.monotonic() - started_at
188
189 # Cancel our worker tasks.
190 for task in tasks:
191 task.cancel()
192 # Wait until all worker tasks are cancelled.
193 await asyncio.gather(*tasks, return_exceptions=True)
194
195 print('====')
196 print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
197 print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
198
199
200 asyncio.run(main())