blob: 597b84d4805bcb59076c162b71f2348255033d97 [file] [log] [blame]
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +03001/*
2 * Definitions for the 'struct ptr_ring' datastructure.
3 *
4 * Author:
5 * Michael S. Tsirkin <mst@redhat.com>
6 *
7 * Copyright (C) 2016 Red Hat, Inc.
8 *
9 * This program is free software; you can redistribute it and/or modify it
10 * under the terms of the GNU General Public License as published by the
11 * Free Software Foundation; either version 2 of the License, or (at your
12 * option) any later version.
13 *
14 * This is a limited-size FIFO maintaining pointers in FIFO order, with
15 * one CPU producing entries and another consuming entries from a FIFO.
16 *
17 * This implementation tries to minimize cache-contention when there is a
18 * single producer and a single consumer CPU.
19 */
20
21#ifndef _LINUX_PTR_RING_H
22#define _LINUX_PTR_RING_H 1
23
24#ifdef __KERNEL__
25#include <linux/spinlock.h>
26#include <linux/cache.h>
27#include <linux/types.h>
28#include <linux/compiler.h>
29#include <linux/cache.h>
30#include <linux/slab.h>
31#include <asm/errno.h>
32#endif
33
34struct ptr_ring {
35 int producer ____cacheline_aligned_in_smp;
36 spinlock_t producer_lock;
37 int consumer ____cacheline_aligned_in_smp;
38 spinlock_t consumer_lock;
39 /* Shared consumer/producer data */
40 /* Read-only by both the producer and the consumer */
41 int size ____cacheline_aligned_in_smp; /* max entries in queue */
42 void **queue;
43};
44
45/* Note: callers invoking this in a loop must use a compiler barrier,
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +030046 * for example cpu_relax(). If ring is ever resized, callers must hold
47 * producer_lock - see e.g. ptr_ring_full. Otherwise, if callers don't hold
48 * producer_lock, the next call to __ptr_ring_produce may fail.
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +030049 */
50static inline bool __ptr_ring_full(struct ptr_ring *r)
51{
52 return r->queue[r->producer];
53}
54
55static inline bool ptr_ring_full(struct ptr_ring *r)
56{
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +030057 bool ret;
58
59 spin_lock(&r->producer_lock);
60 ret = __ptr_ring_full(r);
61 spin_unlock(&r->producer_lock);
62
63 return ret;
64}
65
66static inline bool ptr_ring_full_irq(struct ptr_ring *r)
67{
68 bool ret;
69
70 spin_lock_irq(&r->producer_lock);
71 ret = __ptr_ring_full(r);
72 spin_unlock_irq(&r->producer_lock);
73
74 return ret;
75}
76
77static inline bool ptr_ring_full_any(struct ptr_ring *r)
78{
79 unsigned long flags;
80 bool ret;
81
82 spin_lock_irqsave(&r->producer_lock, flags);
83 ret = __ptr_ring_full(r);
84 spin_unlock_irqrestore(&r->producer_lock, flags);
85
86 return ret;
87}
88
89static inline bool ptr_ring_full_bh(struct ptr_ring *r)
90{
91 bool ret;
92
93 spin_lock_bh(&r->producer_lock);
94 ret = __ptr_ring_full(r);
95 spin_unlock_bh(&r->producer_lock);
96
97 return ret;
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +030098}
99
100/* Note: callers invoking this in a loop must use a compiler barrier,
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +0300101 * for example cpu_relax(). Callers must hold producer_lock.
Michael S. Tsirkin8b032bd2017-12-05 21:29:37 +0200102 * Callers are responsible for making sure pointer that is being queued
103 * points to a valid data.
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +0300104 */
105static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
106{
Jason Wang982fb492016-06-30 14:45:31 +0800107 if (unlikely(!r->size) || r->queue[r->producer])
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +0300108 return -ENOSPC;
109
Michael S. Tsirkin8b032bd2017-12-05 21:29:37 +0200110 /* Make sure the pointer we are storing points to a valid data. */
111 /* Pairs with smp_read_barrier_depends in __ptr_ring_consume. */
112 smp_wmb();
113
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +0300114 r->queue[r->producer++] = ptr;
115 if (unlikely(r->producer >= r->size))
116 r->producer = 0;
117 return 0;
118}
119
Michael S. Tsirkin7c560122017-02-19 07:17:17 +0200120/*
121 * Note: resize (below) nests producer lock within consumer lock, so if you
122 * consume in interrupt or BH context, you must disable interrupts/BH when
123 * calling this.
124 */
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +0300125static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
126{
127 int ret;
128
129 spin_lock(&r->producer_lock);
130 ret = __ptr_ring_produce(r, ptr);
131 spin_unlock(&r->producer_lock);
132
133 return ret;
134}
135
136static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
137{
138 int ret;
139
140 spin_lock_irq(&r->producer_lock);
141 ret = __ptr_ring_produce(r, ptr);
142 spin_unlock_irq(&r->producer_lock);
143
144 return ret;
145}
146
147static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
148{
149 unsigned long flags;
150 int ret;
151
152 spin_lock_irqsave(&r->producer_lock, flags);
153 ret = __ptr_ring_produce(r, ptr);
154 spin_unlock_irqrestore(&r->producer_lock, flags);
155
156 return ret;
157}
158
159static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
160{
161 int ret;
162
163 spin_lock_bh(&r->producer_lock);
164 ret = __ptr_ring_produce(r, ptr);
165 spin_unlock_bh(&r->producer_lock);
166
167 return ret;
168}
169
170/* Note: callers invoking this in a loop must use a compiler barrier,
171 * for example cpu_relax(). Callers must take consumer_lock
172 * if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL.
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +0300173 * If ring is never resized, and if the pointer is merely
174 * tested, there's no need to take the lock - see e.g. __ptr_ring_empty.
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +0300175 */
176static inline void *__ptr_ring_peek(struct ptr_ring *r)
177{
Jason Wang982fb492016-06-30 14:45:31 +0800178 if (likely(r->size))
179 return r->queue[r->consumer];
180 return NULL;
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +0300181}
182
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +0300183/* Note: callers invoking this in a loop must use a compiler barrier,
184 * for example cpu_relax(). Callers must take consumer_lock
185 * if the ring is ever resized - see e.g. ptr_ring_empty.
186 */
187static inline bool __ptr_ring_empty(struct ptr_ring *r)
188{
189 return !__ptr_ring_peek(r);
190}
191
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +0300192static inline bool ptr_ring_empty(struct ptr_ring *r)
193{
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +0300194 bool ret;
195
196 spin_lock(&r->consumer_lock);
197 ret = __ptr_ring_empty(r);
198 spin_unlock(&r->consumer_lock);
199
200 return ret;
201}
202
203static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
204{
205 bool ret;
206
207 spin_lock_irq(&r->consumer_lock);
208 ret = __ptr_ring_empty(r);
209 spin_unlock_irq(&r->consumer_lock);
210
211 return ret;
212}
213
214static inline bool ptr_ring_empty_any(struct ptr_ring *r)
215{
216 unsigned long flags;
217 bool ret;
218
219 spin_lock_irqsave(&r->consumer_lock, flags);
220 ret = __ptr_ring_empty(r);
221 spin_unlock_irqrestore(&r->consumer_lock, flags);
222
223 return ret;
224}
225
226static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
227{
228 bool ret;
229
230 spin_lock_bh(&r->consumer_lock);
231 ret = __ptr_ring_empty(r);
232 spin_unlock_bh(&r->consumer_lock);
233
234 return ret;
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +0300235}
236
237/* Must only be called after __ptr_ring_peek returned !NULL */
238static inline void __ptr_ring_discard_one(struct ptr_ring *r)
239{
240 r->queue[r->consumer++] = NULL;
241 if (unlikely(r->consumer >= r->size))
242 r->consumer = 0;
243}
244
245static inline void *__ptr_ring_consume(struct ptr_ring *r)
246{
247 void *ptr;
248
249 ptr = __ptr_ring_peek(r);
250 if (ptr)
251 __ptr_ring_discard_one(r);
252
Michael S. Tsirkin8b032bd2017-12-05 21:29:37 +0200253 /* Make sure anyone accessing data through the pointer is up to date. */
254 /* Pairs with smp_wmb in __ptr_ring_produce. */
255 smp_read_barrier_depends();
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +0300256 return ptr;
257}
258
Michael S. Tsirkin7c560122017-02-19 07:17:17 +0200259/*
260 * Note: resize (below) nests producer lock within consumer lock, so if you
261 * call this in interrupt or BH context, you must disable interrupts/BH when
262 * producing.
263 */
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +0300264static inline void *ptr_ring_consume(struct ptr_ring *r)
265{
266 void *ptr;
267
268 spin_lock(&r->consumer_lock);
269 ptr = __ptr_ring_consume(r);
270 spin_unlock(&r->consumer_lock);
271
272 return ptr;
273}
274
275static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
276{
277 void *ptr;
278
279 spin_lock_irq(&r->consumer_lock);
280 ptr = __ptr_ring_consume(r);
281 spin_unlock_irq(&r->consumer_lock);
282
283 return ptr;
284}
285
286static inline void *ptr_ring_consume_any(struct ptr_ring *r)
287{
288 unsigned long flags;
289 void *ptr;
290
291 spin_lock_irqsave(&r->consumer_lock, flags);
292 ptr = __ptr_ring_consume(r);
293 spin_unlock_irqrestore(&r->consumer_lock, flags);
294
295 return ptr;
296}
297
298static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
299{
300 void *ptr;
301
302 spin_lock_bh(&r->consumer_lock);
303 ptr = __ptr_ring_consume(r);
304 spin_unlock_bh(&r->consumer_lock);
305
306 return ptr;
307}
308
309/* Cast to structure type and call a function without discarding from FIFO.
310 * Function must return a value.
311 * Callers must take consumer_lock.
312 */
313#define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
314
315#define PTR_RING_PEEK_CALL(r, f) ({ \
316 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
317 \
318 spin_lock(&(r)->consumer_lock); \
319 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
320 spin_unlock(&(r)->consumer_lock); \
321 __PTR_RING_PEEK_CALL_v; \
322})
323
324#define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \
325 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
326 \
327 spin_lock_irq(&(r)->consumer_lock); \
328 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
329 spin_unlock_irq(&(r)->consumer_lock); \
330 __PTR_RING_PEEK_CALL_v; \
331})
332
333#define PTR_RING_PEEK_CALL_BH(r, f) ({ \
334 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
335 \
336 spin_lock_bh(&(r)->consumer_lock); \
337 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
338 spin_unlock_bh(&(r)->consumer_lock); \
339 __PTR_RING_PEEK_CALL_v; \
340})
341
342#define PTR_RING_PEEK_CALL_ANY(r, f) ({ \
343 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
344 unsigned long __PTR_RING_PEEK_CALL_f;\
345 \
346 spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
347 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
348 spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
349 __PTR_RING_PEEK_CALL_v; \
350})
351
Eric Dumazet59af5b872017-08-16 10:36:47 -0700352static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp)
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +0300353{
Jason Wang2e857aa2018-02-11 11:28:12 +0800354 if (size > KMALLOC_MAX_SIZE / sizeof(void *))
Jason Wang5fd4db32018-02-09 17:45:49 +0800355 return NULL;
Eric Dumazet59af5b872017-08-16 10:36:47 -0700356 return kcalloc(size, sizeof(void *), gfp);
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +0300357}
358
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +0300359static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
360{
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +0300361 r->queue = __ptr_ring_init_queue_alloc(size, gfp);
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +0300362 if (!r->queue)
363 return -ENOMEM;
364
365 r->size = size;
366 r->producer = r->consumer = 0;
367 spin_lock_init(&r->producer_lock);
368 spin_lock_init(&r->consumer_lock);
369
370 return 0;
371}
372
Michael S. Tsirkin59e6ae52016-06-30 14:45:33 +0800373static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
374 int size, gfp_t gfp,
375 void (*destroy)(void *))
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +0300376{
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +0300377 int producer = 0;
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +0300378 void **old;
379 void *ptr;
380
Michael S. Tsirkin7c560122017-02-19 07:17:17 +0200381 while ((ptr = __ptr_ring_consume(r)))
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +0300382 if (producer < size)
383 queue[producer++] = ptr;
384 else if (destroy)
385 destroy(ptr);
386
Cong Wangce8ec032018-12-30 12:43:42 -0800387 if (producer >= size)
388 producer = 0;
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +0300389 r->size = size;
390 r->producer = producer;
391 r->consumer = 0;
392 old = r->queue;
393 r->queue = queue;
394
Michael S. Tsirkin59e6ae52016-06-30 14:45:33 +0800395 return old;
396}
397
Michael S. Tsirkin7c560122017-02-19 07:17:17 +0200398/*
399 * Note: producer lock is nested within consumer lock, so if you
400 * resize you must make sure all uses nest correctly.
401 * In particular if you consume ring in interrupt or BH context, you must
402 * disable interrupts/BH when doing so.
403 */
Michael S. Tsirkin59e6ae52016-06-30 14:45:33 +0800404static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
405 void (*destroy)(void *))
406{
407 unsigned long flags;
408 void **queue = __ptr_ring_init_queue_alloc(size, gfp);
409 void **old;
410
411 if (!queue)
412 return -ENOMEM;
413
Michael S. Tsirkin7c560122017-02-19 07:17:17 +0200414 spin_lock_irqsave(&(r)->consumer_lock, flags);
415 spin_lock(&(r)->producer_lock);
Michael S. Tsirkin59e6ae52016-06-30 14:45:33 +0800416
417 old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy);
418
Michael S. Tsirkin7c560122017-02-19 07:17:17 +0200419 spin_unlock(&(r)->producer_lock);
420 spin_unlock_irqrestore(&(r)->consumer_lock, flags);
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +0300421
422 kfree(old);
423
424 return 0;
425}
426
Michael S. Tsirkin7c560122017-02-19 07:17:17 +0200427/*
428 * Note: producer lock is nested within consumer lock, so if you
429 * resize you must make sure all uses nest correctly.
430 * In particular if you consume ring in interrupt or BH context, you must
431 * disable interrupts/BH when doing so.
432 */
Eric Dumazet59af5b872017-08-16 10:36:47 -0700433static inline int ptr_ring_resize_multiple(struct ptr_ring **rings,
434 unsigned int nrings,
Michael S. Tsirkin59e6ae52016-06-30 14:45:33 +0800435 int size,
436 gfp_t gfp, void (*destroy)(void *))
437{
438 unsigned long flags;
439 void ***queues;
440 int i;
441
Eric Dumazet59af5b872017-08-16 10:36:47 -0700442 queues = kmalloc_array(nrings, sizeof(*queues), gfp);
Michael S. Tsirkin59e6ae52016-06-30 14:45:33 +0800443 if (!queues)
444 goto noqueues;
445
446 for (i = 0; i < nrings; ++i) {
447 queues[i] = __ptr_ring_init_queue_alloc(size, gfp);
448 if (!queues[i])
449 goto nomem;
450 }
451
452 for (i = 0; i < nrings; ++i) {
Michael S. Tsirkin7c560122017-02-19 07:17:17 +0200453 spin_lock_irqsave(&(rings[i])->consumer_lock, flags);
454 spin_lock(&(rings[i])->producer_lock);
Michael S. Tsirkin59e6ae52016-06-30 14:45:33 +0800455 queues[i] = __ptr_ring_swap_queue(rings[i], queues[i],
456 size, gfp, destroy);
Michael S. Tsirkin7c560122017-02-19 07:17:17 +0200457 spin_unlock(&(rings[i])->producer_lock);
458 spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags);
Michael S. Tsirkin59e6ae52016-06-30 14:45:33 +0800459 }
460
461 for (i = 0; i < nrings; ++i)
462 kfree(queues[i]);
463
464 kfree(queues);
465
466 return 0;
467
468nomem:
469 while (--i >= 0)
470 kfree(queues[i]);
471
472 kfree(queues);
473
474noqueues:
475 return -ENOMEM;
476}
477
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +0300478static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
479{
480 void *ptr;
481
482 if (destroy)
483 while ((ptr = ptr_ring_consume(r)))
484 destroy(ptr);
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +0300485 kfree(r->queue);
486}
487
488#endif /* _LINUX_PTR_RING_H */