blob: 802a2fef5bcb5dfc9224a8f2956d04f660f13656 [file] [log] [blame]
Jakub Kotur5dd645c2020-12-21 17:28:14 +01001use std::cell::{Cell, UnsafeCell};
2use std::cmp;
3use std::fmt;
4use std::iter::FromIterator;
5use std::marker::PhantomData;
6use std::mem::{self, MaybeUninit};
7use std::ptr;
8use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
9use std::sync::Arc;
10
11use crate::epoch::{self, Atomic, Owned};
12use crate::utils::{Backoff, CachePadded};
13
14// Minimum buffer capacity.
15const MIN_CAP: usize = 64;
16// Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`.
17const MAX_BATCH: usize = 32;
18// If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets
19// deallocated as soon as possible.
20const FLUSH_THRESHOLD_BYTES: usize = 1 << 10;
21
22/// A buffer that holds tasks in a worker queue.
23///
24/// This is just a pointer to the buffer and its length - dropping an instance of this struct will
25/// *not* deallocate the buffer.
26struct Buffer<T> {
27 /// Pointer to the allocated memory.
28 ptr: *mut T,
29
30 /// Capacity of the buffer. Always a power of two.
31 cap: usize,
32}
33
34unsafe impl<T> Send for Buffer<T> {}
35
36impl<T> Buffer<T> {
37 /// Allocates a new buffer with the specified capacity.
38 fn alloc(cap: usize) -> Buffer<T> {
39 debug_assert_eq!(cap, cap.next_power_of_two());
40
41 let mut v = Vec::with_capacity(cap);
42 let ptr = v.as_mut_ptr();
43 mem::forget(v);
44
45 Buffer { ptr, cap }
46 }
47
48 /// Deallocates the buffer.
49 unsafe fn dealloc(self) {
50 drop(Vec::from_raw_parts(self.ptr, 0, self.cap));
51 }
52
53 /// Returns a pointer to the task at the specified `index`.
54 unsafe fn at(&self, index: isize) -> *mut T {
55 // `self.cap` is always a power of two.
56 self.ptr.offset(index & (self.cap - 1) as isize)
57 }
58
59 /// Writes `task` into the specified `index`.
60 ///
61 /// This method might be concurrently called with another `read` at the same index, which is
62 /// technically speaking a data race and therefore UB. We should use an atomic store here, but
63 /// that would be more expensive and difficult to implement generically for all types `T`.
64 /// Hence, as a hack, we use a volatile write instead.
65 unsafe fn write(&self, index: isize, task: T) {
66 ptr::write_volatile(self.at(index), task)
67 }
68
69 /// Reads a task from the specified `index`.
70 ///
71 /// This method might be concurrently called with another `write` at the same index, which is
72 /// technically speaking a data race and therefore UB. We should use an atomic load here, but
73 /// that would be more expensive and difficult to implement generically for all types `T`.
74 /// Hence, as a hack, we use a volatile write instead.
75 unsafe fn read(&self, index: isize) -> T {
76 ptr::read_volatile(self.at(index))
77 }
78}
79
80impl<T> Clone for Buffer<T> {
81 fn clone(&self) -> Buffer<T> {
82 Buffer {
83 ptr: self.ptr,
84 cap: self.cap,
85 }
86 }
87}
88
89impl<T> Copy for Buffer<T> {}
90
91/// Internal queue data shared between the worker and stealers.
92///
93/// The implementation is based on the following work:
94///
95/// 1. [Chase and Lev. Dynamic circular work-stealing deque. SPAA 2005.][chase-lev]
96/// 2. [Le, Pop, Cohen, and Nardelli. Correct and efficient work-stealing for weak memory models.
97/// PPoPP 2013.][weak-mem]
98/// 3. [Norris and Demsky. CDSchecker: checking concurrent data structures written with C/C++
99/// atomics. OOPSLA 2013.][checker]
100///
101/// [chase-lev]: https://dl.acm.org/citation.cfm?id=1073974
102/// [weak-mem]: https://dl.acm.org/citation.cfm?id=2442524
103/// [checker]: https://dl.acm.org/citation.cfm?id=2509514
104struct Inner<T> {
105 /// The front index.
106 front: AtomicIsize,
107
108 /// The back index.
109 back: AtomicIsize,
110
111 /// The underlying buffer.
112 buffer: CachePadded<Atomic<Buffer<T>>>,
113}
114
115impl<T> Drop for Inner<T> {
116 fn drop(&mut self) {
117 // Load the back index, front index, and buffer.
118 let b = self.back.load(Ordering::Relaxed);
119 let f = self.front.load(Ordering::Relaxed);
120
121 unsafe {
122 let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected());
123
124 // Go through the buffer from front to back and drop all tasks in the queue.
125 let mut i = f;
126 while i != b {
127 buffer.deref().at(i).drop_in_place();
128 i = i.wrapping_add(1);
129 }
130
131 // Free the memory allocated by the buffer.
132 buffer.into_owned().into_box().dealloc();
133 }
134 }
135}
136
137/// Worker queue flavor: FIFO or LIFO.
138#[derive(Clone, Copy, Debug, Eq, PartialEq)]
139enum Flavor {
140 /// The first-in first-out flavor.
141 Fifo,
142
143 /// The last-in first-out flavor.
144 Lifo,
145}
146
147/// A worker queue.
148///
149/// This is a FIFO or LIFO queue that is owned by a single thread, but other threads may steal
150/// tasks from it. Task schedulers typically create a single worker queue per thread.
151///
152/// # Examples
153///
154/// A FIFO worker:
155///
156/// ```
157/// use crossbeam_deque::{Steal, Worker};
158///
159/// let w = Worker::new_fifo();
160/// let s = w.stealer();
161///
162/// w.push(1);
163/// w.push(2);
164/// w.push(3);
165///
166/// assert_eq!(s.steal(), Steal::Success(1));
167/// assert_eq!(w.pop(), Some(2));
168/// assert_eq!(w.pop(), Some(3));
169/// ```
170///
171/// A LIFO worker:
172///
173/// ```
174/// use crossbeam_deque::{Steal, Worker};
175///
176/// let w = Worker::new_lifo();
177/// let s = w.stealer();
178///
179/// w.push(1);
180/// w.push(2);
181/// w.push(3);
182///
183/// assert_eq!(s.steal(), Steal::Success(1));
184/// assert_eq!(w.pop(), Some(3));
185/// assert_eq!(w.pop(), Some(2));
186/// ```
187pub struct Worker<T> {
188 /// A reference to the inner representation of the queue.
189 inner: Arc<CachePadded<Inner<T>>>,
190
191 /// A copy of `inner.buffer` for quick access.
192 buffer: Cell<Buffer<T>>,
193
194 /// The flavor of the queue.
195 flavor: Flavor,
196
197 /// Indicates that the worker cannot be shared among threads.
198 _marker: PhantomData<*mut ()>, // !Send + !Sync
199}
200
201unsafe impl<T: Send> Send for Worker<T> {}
202
203impl<T> Worker<T> {
204 /// Creates a FIFO worker queue.
205 ///
206 /// Tasks are pushed and popped from opposite ends.
207 ///
208 /// # Examples
209 ///
210 /// ```
211 /// use crossbeam_deque::Worker;
212 ///
213 /// let w = Worker::<i32>::new_fifo();
214 /// ```
215 pub fn new_fifo() -> Worker<T> {
216 let buffer = Buffer::alloc(MIN_CAP);
217
218 let inner = Arc::new(CachePadded::new(Inner {
219 front: AtomicIsize::new(0),
220 back: AtomicIsize::new(0),
221 buffer: CachePadded::new(Atomic::new(buffer)),
222 }));
223
224 Worker {
225 inner,
226 buffer: Cell::new(buffer),
227 flavor: Flavor::Fifo,
228 _marker: PhantomData,
229 }
230 }
231
232 /// Creates a LIFO worker queue.
233 ///
234 /// Tasks are pushed and popped from the same end.
235 ///
236 /// # Examples
237 ///
238 /// ```
239 /// use crossbeam_deque::Worker;
240 ///
241 /// let w = Worker::<i32>::new_lifo();
242 /// ```
243 pub fn new_lifo() -> Worker<T> {
244 let buffer = Buffer::alloc(MIN_CAP);
245
246 let inner = Arc::new(CachePadded::new(Inner {
247 front: AtomicIsize::new(0),
248 back: AtomicIsize::new(0),
249 buffer: CachePadded::new(Atomic::new(buffer)),
250 }));
251
252 Worker {
253 inner,
254 buffer: Cell::new(buffer),
255 flavor: Flavor::Lifo,
256 _marker: PhantomData,
257 }
258 }
259
260 /// Creates a stealer for this queue.
261 ///
262 /// The returned stealer can be shared among threads and cloned.
263 ///
264 /// # Examples
265 ///
266 /// ```
267 /// use crossbeam_deque::Worker;
268 ///
269 /// let w = Worker::<i32>::new_lifo();
270 /// let s = w.stealer();
271 /// ```
272 pub fn stealer(&self) -> Stealer<T> {
273 Stealer {
274 inner: self.inner.clone(),
275 flavor: self.flavor,
276 }
277 }
278
279 /// Resizes the internal buffer to the new capacity of `new_cap`.
280 #[cold]
281 unsafe fn resize(&self, new_cap: usize) {
282 // Load the back index, front index, and buffer.
283 let b = self.inner.back.load(Ordering::Relaxed);
284 let f = self.inner.front.load(Ordering::Relaxed);
285 let buffer = self.buffer.get();
286
287 // Allocate a new buffer and copy data from the old buffer to the new one.
288 let new = Buffer::alloc(new_cap);
289 let mut i = f;
290 while i != b {
291 ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1);
292 i = i.wrapping_add(1);
293 }
294
295 let guard = &epoch::pin();
296
297 // Replace the old buffer with the new one.
298 self.buffer.replace(new);
299 let old =
300 self.inner
301 .buffer
302 .swap(Owned::new(new).into_shared(guard), Ordering::Release, guard);
303
304 // Destroy the old buffer later.
305 guard.defer_unchecked(move || old.into_owned().into_box().dealloc());
306
307 // If the buffer is very large, then flush the thread-local garbage in order to deallocate
308 // it as soon as possible.
309 if mem::size_of::<T>() * new_cap >= FLUSH_THRESHOLD_BYTES {
310 guard.flush();
311 }
312 }
313
314 /// Reserves enough capacity so that `reserve_cap` tasks can be pushed without growing the
315 /// buffer.
316 fn reserve(&self, reserve_cap: usize) {
317 if reserve_cap > 0 {
318 // Compute the current length.
319 let b = self.inner.back.load(Ordering::Relaxed);
320 let f = self.inner.front.load(Ordering::SeqCst);
321 let len = b.wrapping_sub(f) as usize;
322
323 // The current capacity.
324 let cap = self.buffer.get().cap;
325
326 // Is there enough capacity to push `reserve_cap` tasks?
327 if cap - len < reserve_cap {
328 // Keep doubling the capacity as much as is needed.
329 let mut new_cap = cap * 2;
330 while new_cap - len < reserve_cap {
331 new_cap *= 2;
332 }
333
334 // Resize the buffer.
335 unsafe {
336 self.resize(new_cap);
337 }
338 }
339 }
340 }
341
342 /// Returns `true` if the queue is empty.
343 ///
344 /// ```
345 /// use crossbeam_deque::Worker;
346 ///
347 /// let w = Worker::new_lifo();
348 ///
349 /// assert!(w.is_empty());
350 /// w.push(1);
351 /// assert!(!w.is_empty());
352 /// ```
353 pub fn is_empty(&self) -> bool {
354 let b = self.inner.back.load(Ordering::Relaxed);
355 let f = self.inner.front.load(Ordering::SeqCst);
356 b.wrapping_sub(f) <= 0
357 }
358
359 /// Returns the number of tasks in the deque.
360 ///
361 /// ```
362 /// use crossbeam_deque::Worker;
363 ///
364 /// let w = Worker::new_lifo();
365 ///
366 /// assert_eq!(w.len(), 0);
367 /// w.push(1);
368 /// assert_eq!(w.len(), 1);
369 /// w.push(1);
370 /// assert_eq!(w.len(), 2);
371 /// ```
372 pub fn len(&self) -> usize {
373 let b = self.inner.back.load(Ordering::Relaxed);
374 let f = self.inner.front.load(Ordering::SeqCst);
375 b.wrapping_sub(f).max(0) as usize
376 }
377
378 /// Pushes a task into the queue.
379 ///
380 /// # Examples
381 ///
382 /// ```
383 /// use crossbeam_deque::Worker;
384 ///
385 /// let w = Worker::new_lifo();
386 /// w.push(1);
387 /// w.push(2);
388 /// ```
389 pub fn push(&self, task: T) {
390 // Load the back index, front index, and buffer.
391 let b = self.inner.back.load(Ordering::Relaxed);
392 let f = self.inner.front.load(Ordering::Acquire);
393 let mut buffer = self.buffer.get();
394
395 // Calculate the length of the queue.
396 let len = b.wrapping_sub(f);
397
398 // Is the queue full?
399 if len >= buffer.cap as isize {
400 // Yes. Grow the underlying buffer.
401 unsafe {
402 self.resize(2 * buffer.cap);
403 }
404 buffer = self.buffer.get();
405 }
406
407 // Write `task` into the slot.
408 unsafe {
409 buffer.write(b, task);
410 }
411
412 atomic::fence(Ordering::Release);
413
414 // Increment the back index.
415 //
416 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
417 // races because it doesn't understand fences.
418 self.inner.back.store(b.wrapping_add(1), Ordering::Release);
419 }
420
421 /// Pops a task from the queue.
422 ///
423 /// # Examples
424 ///
425 /// ```
426 /// use crossbeam_deque::Worker;
427 ///
428 /// let w = Worker::new_fifo();
429 /// w.push(1);
430 /// w.push(2);
431 ///
432 /// assert_eq!(w.pop(), Some(1));
433 /// assert_eq!(w.pop(), Some(2));
434 /// assert_eq!(w.pop(), None);
435 /// ```
436 pub fn pop(&self) -> Option<T> {
437 // Load the back and front index.
438 let b = self.inner.back.load(Ordering::Relaxed);
439 let f = self.inner.front.load(Ordering::Relaxed);
440
441 // Calculate the length of the queue.
442 let len = b.wrapping_sub(f);
443
444 // Is the queue empty?
445 if len <= 0 {
446 return None;
447 }
448
449 match self.flavor {
450 // Pop from the front of the queue.
451 Flavor::Fifo => {
452 // Try incrementing the front index to pop the task.
453 let f = self.inner.front.fetch_add(1, Ordering::SeqCst);
454 let new_f = f.wrapping_add(1);
455
456 if b.wrapping_sub(new_f) < 0 {
457 self.inner.front.store(f, Ordering::Relaxed);
458 return None;
459 }
460
461 unsafe {
462 // Read the popped task.
463 let buffer = self.buffer.get();
464 let task = buffer.read(f);
465
466 // Shrink the buffer if `len - 1` is less than one fourth of the capacity.
467 if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 {
468 self.resize(buffer.cap / 2);
469 }
470
471 Some(task)
472 }
473 }
474
475 // Pop from the back of the queue.
476 Flavor::Lifo => {
477 // Decrement the back index.
478 let b = b.wrapping_sub(1);
479 self.inner.back.store(b, Ordering::Relaxed);
480
481 atomic::fence(Ordering::SeqCst);
482
483 // Load the front index.
484 let f = self.inner.front.load(Ordering::Relaxed);
485
486 // Compute the length after the back index was decremented.
487 let len = b.wrapping_sub(f);
488
489 if len < 0 {
490 // The queue is empty. Restore the back index to the original task.
491 self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
492 None
493 } else {
494 // Read the task to be popped.
495 let buffer = self.buffer.get();
496 let mut task = unsafe { Some(buffer.read(b)) };
497
498 // Are we popping the last task from the queue?
499 if len == 0 {
500 // Try incrementing the front index.
501 if self
502 .inner
503 .front
504 .compare_exchange(
505 f,
506 f.wrapping_add(1),
507 Ordering::SeqCst,
508 Ordering::Relaxed,
509 )
510 .is_err()
511 {
512 // Failed. We didn't pop anything.
513 mem::forget(task.take());
514 }
515
516 // Restore the back index to the original task.
517 self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
518 } else {
519 // Shrink the buffer if `len` is less than one fourth of the capacity.
520 if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 {
521 unsafe {
522 self.resize(buffer.cap / 2);
523 }
524 }
525 }
526
527 task
528 }
529 }
530 }
531 }
532}
533
534impl<T> fmt::Debug for Worker<T> {
535 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
536 f.pad("Worker { .. }")
537 }
538}
539
540/// A stealer handle of a worker queue.
541///
542/// Stealers can be shared among threads.
543///
544/// Task schedulers typically have a single worker queue per worker thread.
545///
546/// # Examples
547///
548/// ```
549/// use crossbeam_deque::{Steal, Worker};
550///
551/// let w = Worker::new_lifo();
552/// w.push(1);
553/// w.push(2);
554///
555/// let s = w.stealer();
556/// assert_eq!(s.steal(), Steal::Success(1));
557/// assert_eq!(s.steal(), Steal::Success(2));
558/// assert_eq!(s.steal(), Steal::Empty);
559/// ```
560pub struct Stealer<T> {
561 /// A reference to the inner representation of the queue.
562 inner: Arc<CachePadded<Inner<T>>>,
563
564 /// The flavor of the queue.
565 flavor: Flavor,
566}
567
568unsafe impl<T: Send> Send for Stealer<T> {}
569unsafe impl<T: Send> Sync for Stealer<T> {}
570
571impl<T> Stealer<T> {
572 /// Returns `true` if the queue is empty.
573 ///
574 /// ```
575 /// use crossbeam_deque::Worker;
576 ///
577 /// let w = Worker::new_lifo();
578 /// let s = w.stealer();
579 ///
580 /// assert!(s.is_empty());
581 /// w.push(1);
582 /// assert!(!s.is_empty());
583 /// ```
584 pub fn is_empty(&self) -> bool {
585 let f = self.inner.front.load(Ordering::Acquire);
586 atomic::fence(Ordering::SeqCst);
587 let b = self.inner.back.load(Ordering::Acquire);
588 b.wrapping_sub(f) <= 0
589 }
590
Joel Galensonf0b17732021-08-09 10:27:52 -0700591 /// Returns the number of tasks in the deque.
592 ///
593 /// ```
594 /// use crossbeam_deque::Worker;
595 ///
596 /// let w = Worker::new_lifo();
597 /// let s = w.stealer();
598 ///
599 /// assert_eq!(s.len(), 0);
600 /// w.push(1);
601 /// assert_eq!(s.len(), 1);
602 /// w.push(2);
603 /// assert_eq!(s.len(), 2);
604 /// ```
605 pub fn len(&self) -> usize {
606 let f = self.inner.front.load(Ordering::Acquire);
607 atomic::fence(Ordering::SeqCst);
608 let b = self.inner.back.load(Ordering::Acquire);
609 b.wrapping_sub(f).max(0) as usize
610 }
611
Jakub Kotur5dd645c2020-12-21 17:28:14 +0100612 /// Steals a task from the queue.
613 ///
614 /// # Examples
615 ///
616 /// ```
617 /// use crossbeam_deque::{Steal, Worker};
618 ///
619 /// let w = Worker::new_lifo();
620 /// w.push(1);
621 /// w.push(2);
622 ///
623 /// let s = w.stealer();
624 /// assert_eq!(s.steal(), Steal::Success(1));
625 /// assert_eq!(s.steal(), Steal::Success(2));
626 /// ```
627 pub fn steal(&self) -> Steal<T> {
628 // Load the front index.
629 let f = self.inner.front.load(Ordering::Acquire);
630
631 // A SeqCst fence is needed here.
632 //
633 // If the current thread is already pinned (reentrantly), we must manually issue the
634 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
635 // have to.
636 if epoch::is_pinned() {
637 atomic::fence(Ordering::SeqCst);
638 }
639
640 let guard = &epoch::pin();
641
642 // Load the back index.
643 let b = self.inner.back.load(Ordering::Acquire);
644
645 // Is the queue empty?
646 if b.wrapping_sub(f) <= 0 {
647 return Steal::Empty;
648 }
649
650 // Load the buffer and read the task at the front.
651 let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
652 let task = unsafe { buffer.deref().read(f) };
653
654 // Try incrementing the front index to steal the task.
Joel Galensonf0b17732021-08-09 10:27:52 -0700655 // If the buffer has been swapped or the increment fails, we retry.
656 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
657 || self
658 .inner
659 .front
660 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
661 .is_err()
Jakub Kotur5dd645c2020-12-21 17:28:14 +0100662 {
663 // We didn't steal this task, forget it.
664 mem::forget(task);
665 return Steal::Retry;
666 }
667
668 // Return the stolen task.
669 Steal::Success(task)
670 }
671
672 /// Steals a batch of tasks and pushes them into another worker.
673 ///
674 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
675 /// steal around half of the tasks in the queue, but also not more than some constant limit.
676 ///
677 /// # Examples
678 ///
679 /// ```
680 /// use crossbeam_deque::Worker;
681 ///
682 /// let w1 = Worker::new_fifo();
683 /// w1.push(1);
684 /// w1.push(2);
685 /// w1.push(3);
686 /// w1.push(4);
687 ///
688 /// let s = w1.stealer();
689 /// let w2 = Worker::new_fifo();
690 ///
691 /// let _ = s.steal_batch(&w2);
692 /// assert_eq!(w2.pop(), Some(1));
693 /// assert_eq!(w2.pop(), Some(2));
694 /// ```
695 pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
696 if Arc::ptr_eq(&self.inner, &dest.inner) {
697 if dest.is_empty() {
698 return Steal::Empty;
699 } else {
700 return Steal::Success(());
701 }
702 }
703
704 // Load the front index.
705 let mut f = self.inner.front.load(Ordering::Acquire);
706
707 // A SeqCst fence is needed here.
708 //
709 // If the current thread is already pinned (reentrantly), we must manually issue the
710 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
711 // have to.
712 if epoch::is_pinned() {
713 atomic::fence(Ordering::SeqCst);
714 }
715
716 let guard = &epoch::pin();
717
718 // Load the back index.
719 let b = self.inner.back.load(Ordering::Acquire);
720
721 // Is the queue empty?
722 let len = b.wrapping_sub(f);
723 if len <= 0 {
724 return Steal::Empty;
725 }
726
727 // Reserve capacity for the stolen batch.
728 let batch_size = cmp::min((len as usize + 1) / 2, MAX_BATCH);
729 dest.reserve(batch_size);
730 let mut batch_size = batch_size as isize;
731
732 // Get the destination buffer and back index.
733 let dest_buffer = dest.buffer.get();
734 let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
735
736 // Load the buffer.
737 let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
738
739 match self.flavor {
740 // Steal a batch of tasks from the front at once.
741 Flavor::Fifo => {
742 // Copy the batch from the source to the destination buffer.
743 match dest.flavor {
744 Flavor::Fifo => {
745 for i in 0..batch_size {
746 unsafe {
747 let task = buffer.deref().read(f.wrapping_add(i));
748 dest_buffer.write(dest_b.wrapping_add(i), task);
749 }
750 }
751 }
752 Flavor::Lifo => {
753 for i in 0..batch_size {
754 unsafe {
755 let task = buffer.deref().read(f.wrapping_add(i));
756 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
757 }
758 }
759 }
760 }
761
762 // Try incrementing the front index to steal the batch.
Joel Galensonf0b17732021-08-09 10:27:52 -0700763 // If the buffer has been swapped or the increment fails, we retry.
764 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
765 || self
766 .inner
767 .front
768 .compare_exchange(
769 f,
770 f.wrapping_add(batch_size),
771 Ordering::SeqCst,
772 Ordering::Relaxed,
773 )
774 .is_err()
Jakub Kotur5dd645c2020-12-21 17:28:14 +0100775 {
776 return Steal::Retry;
777 }
778
779 dest_b = dest_b.wrapping_add(batch_size);
780 }
781
782 // Steal a batch of tasks from the front one by one.
783 Flavor::Lifo => {
Joel Galensonf0b17732021-08-09 10:27:52 -0700784 // This loop may modify the batch_size, which triggers a clippy lint warning.
785 // Use a new variable to avoid the warning, and to make it clear we aren't
786 // modifying the loop exit condition during iteration.
787 let original_batch_size = batch_size;
788
789 for i in 0..original_batch_size {
Jakub Kotur5dd645c2020-12-21 17:28:14 +0100790 // If this is not the first steal, check whether the queue is empty.
791 if i > 0 {
792 // We've already got the current front index. Now execute the fence to
793 // synchronize with other threads.
794 atomic::fence(Ordering::SeqCst);
795
796 // Load the back index.
797 let b = self.inner.back.load(Ordering::Acquire);
798
799 // Is the queue empty?
800 if b.wrapping_sub(f) <= 0 {
801 batch_size = i;
802 break;
803 }
804 }
805
806 // Read the task at the front.
807 let task = unsafe { buffer.deref().read(f) };
808
809 // Try incrementing the front index to steal the task.
Joel Galensonf0b17732021-08-09 10:27:52 -0700810 // If the buffer has been swapped or the increment fails, we retry.
811 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
812 || self
813 .inner
814 .front
815 .compare_exchange(
816 f,
817 f.wrapping_add(1),
818 Ordering::SeqCst,
819 Ordering::Relaxed,
820 )
821 .is_err()
Jakub Kotur5dd645c2020-12-21 17:28:14 +0100822 {
823 // We didn't steal this task, forget it and break from the loop.
824 mem::forget(task);
825 batch_size = i;
826 break;
827 }
828
829 // Write the stolen task into the destination buffer.
830 unsafe {
831 dest_buffer.write(dest_b, task);
832 }
833
834 // Move the source front index and the destination back index one step forward.
835 f = f.wrapping_add(1);
836 dest_b = dest_b.wrapping_add(1);
837 }
838
839 // If we didn't steal anything, the operation needs to be retried.
840 if batch_size == 0 {
841 return Steal::Retry;
842 }
843
844 // If stealing into a FIFO queue, stolen tasks need to be reversed.
845 if dest.flavor == Flavor::Fifo {
846 for i in 0..batch_size / 2 {
847 unsafe {
848 let i1 = dest_b.wrapping_sub(batch_size - i);
849 let i2 = dest_b.wrapping_sub(i + 1);
850 let t1 = dest_buffer.read(i1);
851 let t2 = dest_buffer.read(i2);
852 dest_buffer.write(i1, t2);
853 dest_buffer.write(i2, t1);
854 }
855 }
856 }
857 }
858 }
859
860 atomic::fence(Ordering::Release);
861
862 // Update the back index in the destination queue.
863 //
864 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
865 // races because it doesn't understand fences.
866 dest.inner.back.store(dest_b, Ordering::Release);
867
868 // Return with success.
869 Steal::Success(())
870 }
871
872 /// Steals a batch of tasks, pushes them into another worker, and pops a task from that worker.
873 ///
874 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
875 /// steal around half of the tasks in the queue, but also not more than some constant limit.
876 ///
877 /// # Examples
878 ///
879 /// ```
880 /// use crossbeam_deque::{Steal, Worker};
881 ///
882 /// let w1 = Worker::new_fifo();
883 /// w1.push(1);
884 /// w1.push(2);
885 /// w1.push(3);
886 /// w1.push(4);
887 ///
888 /// let s = w1.stealer();
889 /// let w2 = Worker::new_fifo();
890 ///
891 /// assert_eq!(s.steal_batch_and_pop(&w2), Steal::Success(1));
892 /// assert_eq!(w2.pop(), Some(2));
893 /// ```
894 pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
895 if Arc::ptr_eq(&self.inner, &dest.inner) {
896 match dest.pop() {
897 None => return Steal::Empty,
898 Some(task) => return Steal::Success(task),
899 }
900 }
901
902 // Load the front index.
903 let mut f = self.inner.front.load(Ordering::Acquire);
904
905 // A SeqCst fence is needed here.
906 //
907 // If the current thread is already pinned (reentrantly), we must manually issue the
908 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
909 // have to.
910 if epoch::is_pinned() {
911 atomic::fence(Ordering::SeqCst);
912 }
913
914 let guard = &epoch::pin();
915
916 // Load the back index.
917 let b = self.inner.back.load(Ordering::Acquire);
918
919 // Is the queue empty?
920 let len = b.wrapping_sub(f);
921 if len <= 0 {
922 return Steal::Empty;
923 }
924
925 // Reserve capacity for the stolen batch.
926 let batch_size = cmp::min((len as usize - 1) / 2, MAX_BATCH - 1);
927 dest.reserve(batch_size);
928 let mut batch_size = batch_size as isize;
929
930 // Get the destination buffer and back index.
931 let dest_buffer = dest.buffer.get();
932 let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
933
934 // Load the buffer
935 let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
936
937 // Read the task at the front.
938 let mut task = unsafe { buffer.deref().read(f) };
939
940 match self.flavor {
941 // Steal a batch of tasks from the front at once.
942 Flavor::Fifo => {
943 // Copy the batch from the source to the destination buffer.
944 match dest.flavor {
945 Flavor::Fifo => {
946 for i in 0..batch_size {
947 unsafe {
948 let task = buffer.deref().read(f.wrapping_add(i + 1));
949 dest_buffer.write(dest_b.wrapping_add(i), task);
950 }
951 }
952 }
953 Flavor::Lifo => {
954 for i in 0..batch_size {
955 unsafe {
956 let task = buffer.deref().read(f.wrapping_add(i + 1));
957 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
958 }
959 }
960 }
961 }
962
Joel Galensonf0b17732021-08-09 10:27:52 -0700963 // Try incrementing the front index to steal the task.
964 // If the buffer has been swapped or the increment fails, we retry.
965 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
966 || self
967 .inner
968 .front
969 .compare_exchange(
970 f,
971 f.wrapping_add(batch_size + 1),
972 Ordering::SeqCst,
973 Ordering::Relaxed,
974 )
975 .is_err()
Jakub Kotur5dd645c2020-12-21 17:28:14 +0100976 {
977 // We didn't steal this task, forget it.
978 mem::forget(task);
979 return Steal::Retry;
980 }
981
982 dest_b = dest_b.wrapping_add(batch_size);
983 }
984
985 // Steal a batch of tasks from the front one by one.
986 Flavor::Lifo => {
987 // Try incrementing the front index to steal the task.
988 if self
989 .inner
990 .front
991 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
992 .is_err()
993 {
994 // We didn't steal this task, forget it.
995 mem::forget(task);
996 return Steal::Retry;
997 }
998
999 // Move the front index one step forward.
1000 f = f.wrapping_add(1);
1001
1002 // Repeat the same procedure for the batch steals.
Joel Galensonf0b17732021-08-09 10:27:52 -07001003 //
1004 // This loop may modify the batch_size, which triggers a clippy lint warning.
1005 // Use a new variable to avoid the warning, and to make it clear we aren't
1006 // modifying the loop exit condition during iteration.
1007 let original_batch_size = batch_size;
1008 for i in 0..original_batch_size {
Jakub Kotur5dd645c2020-12-21 17:28:14 +01001009 // We've already got the current front index. Now execute the fence to
1010 // synchronize with other threads.
1011 atomic::fence(Ordering::SeqCst);
1012
1013 // Load the back index.
1014 let b = self.inner.back.load(Ordering::Acquire);
1015
1016 // Is the queue empty?
1017 if b.wrapping_sub(f) <= 0 {
1018 batch_size = i;
1019 break;
1020 }
1021
1022 // Read the task at the front.
1023 let tmp = unsafe { buffer.deref().read(f) };
1024
1025 // Try incrementing the front index to steal the task.
Joel Galensonf0b17732021-08-09 10:27:52 -07001026 // If the buffer has been swapped or the increment fails, we retry.
1027 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1028 || self
1029 .inner
1030 .front
1031 .compare_exchange(
1032 f,
1033 f.wrapping_add(1),
1034 Ordering::SeqCst,
1035 Ordering::Relaxed,
1036 )
1037 .is_err()
Jakub Kotur5dd645c2020-12-21 17:28:14 +01001038 {
1039 // We didn't steal this task, forget it and break from the loop.
1040 mem::forget(tmp);
1041 batch_size = i;
1042 break;
1043 }
1044
1045 // Write the previously stolen task into the destination buffer.
1046 unsafe {
1047 dest_buffer.write(dest_b, mem::replace(&mut task, tmp));
1048 }
1049
1050 // Move the source front index and the destination back index one step forward.
1051 f = f.wrapping_add(1);
1052 dest_b = dest_b.wrapping_add(1);
1053 }
1054
1055 // If stealing into a FIFO queue, stolen tasks need to be reversed.
1056 if dest.flavor == Flavor::Fifo {
1057 for i in 0..batch_size / 2 {
1058 unsafe {
1059 let i1 = dest_b.wrapping_sub(batch_size - i);
1060 let i2 = dest_b.wrapping_sub(i + 1);
1061 let t1 = dest_buffer.read(i1);
1062 let t2 = dest_buffer.read(i2);
1063 dest_buffer.write(i1, t2);
1064 dest_buffer.write(i2, t1);
1065 }
1066 }
1067 }
1068 }
1069 }
1070
1071 atomic::fence(Ordering::Release);
1072
1073 // Update the back index in the destination queue.
1074 //
1075 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
1076 // races because it doesn't understand fences.
1077 dest.inner.back.store(dest_b, Ordering::Release);
1078
1079 // Return with success.
1080 Steal::Success(task)
1081 }
1082}
1083
1084impl<T> Clone for Stealer<T> {
1085 fn clone(&self) -> Stealer<T> {
1086 Stealer {
1087 inner: self.inner.clone(),
1088 flavor: self.flavor,
1089 }
1090 }
1091}
1092
1093impl<T> fmt::Debug for Stealer<T> {
1094 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1095 f.pad("Stealer { .. }")
1096 }
1097}
1098
1099// Bits indicating the state of a slot:
1100// * If a task has been written into the slot, `WRITE` is set.
1101// * If a task has been read from the slot, `READ` is set.
1102// * If the block is being destroyed, `DESTROY` is set.
1103const WRITE: usize = 1;
1104const READ: usize = 2;
1105const DESTROY: usize = 4;
1106
1107// Each block covers one "lap" of indices.
1108const LAP: usize = 64;
1109// The maximum number of values a block can hold.
1110const BLOCK_CAP: usize = LAP - 1;
1111// How many lower bits are reserved for metadata.
1112const SHIFT: usize = 1;
1113// Indicates that the block is not the last one.
1114const HAS_NEXT: usize = 1;
1115
1116/// A slot in a block.
1117struct Slot<T> {
1118 /// The task.
1119 task: UnsafeCell<MaybeUninit<T>>,
1120
1121 /// The state of the slot.
1122 state: AtomicUsize,
1123}
1124
1125impl<T> Slot<T> {
1126 /// Waits until a task is written into the slot.
1127 fn wait_write(&self) {
1128 let backoff = Backoff::new();
1129 while self.state.load(Ordering::Acquire) & WRITE == 0 {
1130 backoff.snooze();
1131 }
1132 }
1133}
1134
1135/// A block in a linked list.
1136///
1137/// Each block in the list can hold up to `BLOCK_CAP` values.
1138struct Block<T> {
1139 /// The next block in the linked list.
1140 next: AtomicPtr<Block<T>>,
1141
1142 /// Slots for values.
1143 slots: [Slot<T>; BLOCK_CAP],
1144}
1145
1146impl<T> Block<T> {
1147 /// Creates an empty block that starts at `start_index`.
1148 fn new() -> Block<T> {
1149 // SAFETY: This is safe because:
1150 // [1] `Block::next` (AtomicPtr) may be safely zero initialized.
1151 // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
1152 // [3] `Slot::task` (UnsafeCell) may be safely zero initialized because it
1153 // holds a MaybeUninit.
1154 // [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
1155 unsafe { MaybeUninit::zeroed().assume_init() }
1156 }
1157
1158 /// Waits until the next pointer is set.
1159 fn wait_next(&self) -> *mut Block<T> {
1160 let backoff = Backoff::new();
1161 loop {
1162 let next = self.next.load(Ordering::Acquire);
1163 if !next.is_null() {
1164 return next;
1165 }
1166 backoff.snooze();
1167 }
1168 }
1169
1170 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
1171 unsafe fn destroy(this: *mut Block<T>, count: usize) {
1172 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
1173 // begun destruction of the block.
1174 for i in (0..count).rev() {
1175 let slot = (*this).slots.get_unchecked(i);
1176
1177 // Mark the `DESTROY` bit if a thread is still using the slot.
1178 if slot.state.load(Ordering::Acquire) & READ == 0
1179 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
1180 {
1181 // If a thread is still using the slot, it will continue destruction of the block.
1182 return;
1183 }
1184 }
1185
1186 // No thread is using the block, now it is safe to destroy it.
1187 drop(Box::from_raw(this));
1188 }
1189}
1190
1191/// A position in a queue.
1192struct Position<T> {
1193 /// The index in the queue.
1194 index: AtomicUsize,
1195
1196 /// The block in the linked list.
1197 block: AtomicPtr<Block<T>>,
1198}
1199
1200/// An injector queue.
1201///
1202/// This is a FIFO queue that can be shared among multiple threads. Task schedulers typically have
1203/// a single injector queue, which is the entry point for new tasks.
1204///
1205/// # Examples
1206///
1207/// ```
1208/// use crossbeam_deque::{Injector, Steal};
1209///
1210/// let q = Injector::new();
1211/// q.push(1);
1212/// q.push(2);
1213///
1214/// assert_eq!(q.steal(), Steal::Success(1));
1215/// assert_eq!(q.steal(), Steal::Success(2));
1216/// assert_eq!(q.steal(), Steal::Empty);
1217/// ```
1218pub struct Injector<T> {
1219 /// The head of the queue.
1220 head: CachePadded<Position<T>>,
1221
1222 /// The tail of the queue.
1223 tail: CachePadded<Position<T>>,
1224
1225 /// Indicates that dropping a `Injector<T>` may drop values of type `T`.
1226 _marker: PhantomData<T>,
1227}
1228
1229unsafe impl<T: Send> Send for Injector<T> {}
1230unsafe impl<T: Send> Sync for Injector<T> {}
1231
1232impl<T> Default for Injector<T> {
1233 fn default() -> Self {
1234 let block = Box::into_raw(Box::new(Block::<T>::new()));
1235 Self {
1236 head: CachePadded::new(Position {
1237 block: AtomicPtr::new(block),
1238 index: AtomicUsize::new(0),
1239 }),
1240 tail: CachePadded::new(Position {
1241 block: AtomicPtr::new(block),
1242 index: AtomicUsize::new(0),
1243 }),
1244 _marker: PhantomData,
1245 }
1246 }
1247}
1248
1249impl<T> Injector<T> {
1250 /// Creates a new injector queue.
1251 ///
1252 /// # Examples
1253 ///
1254 /// ```
1255 /// use crossbeam_deque::Injector;
1256 ///
1257 /// let q = Injector::<i32>::new();
1258 /// ```
1259 pub fn new() -> Injector<T> {
1260 Self::default()
1261 }
1262
1263 /// Pushes a task into the queue.
1264 ///
1265 /// # Examples
1266 ///
1267 /// ```
1268 /// use crossbeam_deque::Injector;
1269 ///
1270 /// let w = Injector::new();
1271 /// w.push(1);
1272 /// w.push(2);
1273 /// ```
1274 pub fn push(&self, task: T) {
1275 let backoff = Backoff::new();
1276 let mut tail = self.tail.index.load(Ordering::Acquire);
1277 let mut block = self.tail.block.load(Ordering::Acquire);
1278 let mut next_block = None;
1279
1280 loop {
1281 // Calculate the offset of the index into the block.
1282 let offset = (tail >> SHIFT) % LAP;
1283
1284 // If we reached the end of the block, wait until the next one is installed.
1285 if offset == BLOCK_CAP {
1286 backoff.snooze();
1287 tail = self.tail.index.load(Ordering::Acquire);
1288 block = self.tail.block.load(Ordering::Acquire);
1289 continue;
1290 }
1291
1292 // If we're going to have to install the next block, allocate it in advance in order to
1293 // make the wait for other threads as short as possible.
1294 if offset + 1 == BLOCK_CAP && next_block.is_none() {
1295 next_block = Some(Box::new(Block::<T>::new()));
1296 }
1297
1298 let new_tail = tail + (1 << SHIFT);
1299
1300 // Try advancing the tail forward.
1301 match self.tail.index.compare_exchange_weak(
1302 tail,
1303 new_tail,
1304 Ordering::SeqCst,
1305 Ordering::Acquire,
1306 ) {
1307 Ok(_) => unsafe {
1308 // If we've reached the end of the block, install the next one.
1309 if offset + 1 == BLOCK_CAP {
1310 let next_block = Box::into_raw(next_block.unwrap());
1311 let next_index = new_tail.wrapping_add(1 << SHIFT);
1312
1313 self.tail.block.store(next_block, Ordering::Release);
1314 self.tail.index.store(next_index, Ordering::Release);
1315 (*block).next.store(next_block, Ordering::Release);
1316 }
1317
1318 // Write the task into the slot.
1319 let slot = (*block).slots.get_unchecked(offset);
1320 slot.task.get().write(MaybeUninit::new(task));
1321 slot.state.fetch_or(WRITE, Ordering::Release);
1322
1323 return;
1324 },
1325 Err(t) => {
1326 tail = t;
1327 block = self.tail.block.load(Ordering::Acquire);
1328 backoff.spin();
1329 }
1330 }
1331 }
1332 }
1333
1334 /// Steals a task from the queue.
1335 ///
1336 /// # Examples
1337 ///
1338 /// ```
1339 /// use crossbeam_deque::{Injector, Steal};
1340 ///
1341 /// let q = Injector::new();
1342 /// q.push(1);
1343 /// q.push(2);
1344 ///
1345 /// assert_eq!(q.steal(), Steal::Success(1));
1346 /// assert_eq!(q.steal(), Steal::Success(2));
1347 /// assert_eq!(q.steal(), Steal::Empty);
1348 /// ```
1349 pub fn steal(&self) -> Steal<T> {
1350 let mut head;
1351 let mut block;
1352 let mut offset;
1353
1354 let backoff = Backoff::new();
1355 loop {
1356 head = self.head.index.load(Ordering::Acquire);
1357 block = self.head.block.load(Ordering::Acquire);
1358
1359 // Calculate the offset of the index into the block.
1360 offset = (head >> SHIFT) % LAP;
1361
1362 // If we reached the end of the block, wait until the next one is installed.
1363 if offset == BLOCK_CAP {
1364 backoff.snooze();
1365 } else {
1366 break;
1367 }
1368 }
1369
1370 let mut new_head = head + (1 << SHIFT);
1371
1372 if new_head & HAS_NEXT == 0 {
1373 atomic::fence(Ordering::SeqCst);
1374 let tail = self.tail.index.load(Ordering::Relaxed);
1375
1376 // If the tail equals the head, that means the queue is empty.
1377 if head >> SHIFT == tail >> SHIFT {
1378 return Steal::Empty;
1379 }
1380
1381 // If head and tail are not in the same block, set `HAS_NEXT` in head.
1382 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1383 new_head |= HAS_NEXT;
1384 }
1385 }
1386
1387 // Try moving the head index forward.
1388 if self
1389 .head
1390 .index
1391 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1392 .is_err()
1393 {
1394 return Steal::Retry;
1395 }
1396
1397 unsafe {
1398 // If we've reached the end of the block, move to the next one.
1399 if offset + 1 == BLOCK_CAP {
1400 let next = (*block).wait_next();
1401 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1402 if !(*next).next.load(Ordering::Relaxed).is_null() {
1403 next_index |= HAS_NEXT;
1404 }
1405
1406 self.head.block.store(next, Ordering::Release);
1407 self.head.index.store(next_index, Ordering::Release);
1408 }
1409
1410 // Read the task.
1411 let slot = (*block).slots.get_unchecked(offset);
1412 slot.wait_write();
1413 let task = slot.task.get().read().assume_init();
1414
1415 // Destroy the block if we've reached the end, or if another thread wanted to destroy
1416 // but couldn't because we were busy reading from the slot.
Joel Galensonf0b17732021-08-09 10:27:52 -07001417 if (offset + 1 == BLOCK_CAP)
1418 || (slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0)
1419 {
Jakub Kotur5dd645c2020-12-21 17:28:14 +01001420 Block::destroy(block, offset);
1421 }
1422
1423 Steal::Success(task)
1424 }
1425 }
1426
1427 /// Steals a batch of tasks and pushes them into a worker.
1428 ///
1429 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1430 /// steal around half of the tasks in the queue, but also not more than some constant limit.
1431 ///
1432 /// # Examples
1433 ///
1434 /// ```
1435 /// use crossbeam_deque::{Injector, Worker};
1436 ///
1437 /// let q = Injector::new();
1438 /// q.push(1);
1439 /// q.push(2);
1440 /// q.push(3);
1441 /// q.push(4);
1442 ///
1443 /// let w = Worker::new_fifo();
1444 /// let _ = q.steal_batch(&w);
1445 /// assert_eq!(w.pop(), Some(1));
1446 /// assert_eq!(w.pop(), Some(2));
1447 /// ```
1448 pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
1449 let mut head;
1450 let mut block;
1451 let mut offset;
1452
1453 let backoff = Backoff::new();
1454 loop {
1455 head = self.head.index.load(Ordering::Acquire);
1456 block = self.head.block.load(Ordering::Acquire);
1457
1458 // Calculate the offset of the index into the block.
1459 offset = (head >> SHIFT) % LAP;
1460
1461 // If we reached the end of the block, wait until the next one is installed.
1462 if offset == BLOCK_CAP {
1463 backoff.snooze();
1464 } else {
1465 break;
1466 }
1467 }
1468
1469 let mut new_head = head;
1470 let advance;
1471
1472 if new_head & HAS_NEXT == 0 {
1473 atomic::fence(Ordering::SeqCst);
1474 let tail = self.tail.index.load(Ordering::Relaxed);
1475
1476 // If the tail equals the head, that means the queue is empty.
1477 if head >> SHIFT == tail >> SHIFT {
1478 return Steal::Empty;
1479 }
1480
1481 // If head and tail are not in the same block, set `HAS_NEXT` in head. Also, calculate
1482 // the right batch size to steal.
1483 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1484 new_head |= HAS_NEXT;
1485 // We can steal all tasks till the end of the block.
1486 advance = (BLOCK_CAP - offset).min(MAX_BATCH);
1487 } else {
1488 let len = (tail - head) >> SHIFT;
1489 // Steal half of the available tasks.
1490 advance = ((len + 1) / 2).min(MAX_BATCH);
1491 }
1492 } else {
1493 // We can steal all tasks till the end of the block.
1494 advance = (BLOCK_CAP - offset).min(MAX_BATCH);
1495 }
1496
1497 new_head += advance << SHIFT;
1498 let new_offset = offset + advance;
1499
1500 // Try moving the head index forward.
1501 if self
1502 .head
1503 .index
1504 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1505 .is_err()
1506 {
1507 return Steal::Retry;
1508 }
1509
1510 // Reserve capacity for the stolen batch.
1511 let batch_size = new_offset - offset;
1512 dest.reserve(batch_size);
1513
1514 // Get the destination buffer and back index.
1515 let dest_buffer = dest.buffer.get();
1516 let dest_b = dest.inner.back.load(Ordering::Relaxed);
1517
1518 unsafe {
1519 // If we've reached the end of the block, move to the next one.
1520 if new_offset == BLOCK_CAP {
1521 let next = (*block).wait_next();
1522 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1523 if !(*next).next.load(Ordering::Relaxed).is_null() {
1524 next_index |= HAS_NEXT;
1525 }
1526
1527 self.head.block.store(next, Ordering::Release);
1528 self.head.index.store(next_index, Ordering::Release);
1529 }
1530
1531 // Copy values from the injector into the destination queue.
1532 match dest.flavor {
1533 Flavor::Fifo => {
1534 for i in 0..batch_size {
1535 // Read the task.
1536 let slot = (*block).slots.get_unchecked(offset + i);
1537 slot.wait_write();
1538 let task = slot.task.get().read().assume_init();
1539
1540 // Write it into the destination queue.
1541 dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1542 }
1543 }
1544
1545 Flavor::Lifo => {
1546 for i in 0..batch_size {
1547 // Read the task.
1548 let slot = (*block).slots.get_unchecked(offset + i);
1549 slot.wait_write();
1550 let task = slot.task.get().read().assume_init();
1551
1552 // Write it into the destination queue.
1553 dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1554 }
1555 }
1556 }
1557
1558 atomic::fence(Ordering::Release);
1559
1560 // Update the back index in the destination queue.
1561 //
1562 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1563 // data races because it doesn't understand fences.
1564 dest.inner
1565 .back
1566 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1567
1568 // Destroy the block if we've reached the end, or if another thread wanted to destroy
1569 // but couldn't because we were busy reading from the slot.
1570 if new_offset == BLOCK_CAP {
1571 Block::destroy(block, offset);
1572 } else {
1573 for i in offset..new_offset {
1574 let slot = (*block).slots.get_unchecked(i);
1575
1576 if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1577 Block::destroy(block, offset);
1578 break;
1579 }
1580 }
1581 }
1582
1583 Steal::Success(())
1584 }
1585 }
1586
1587 /// Steals a batch of tasks, pushes them into a worker, and pops a task from that worker.
1588 ///
1589 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1590 /// steal around half of the tasks in the queue, but also not more than some constant limit.
1591 ///
1592 /// # Examples
1593 ///
1594 /// ```
1595 /// use crossbeam_deque::{Injector, Steal, Worker};
1596 ///
1597 /// let q = Injector::new();
1598 /// q.push(1);
1599 /// q.push(2);
1600 /// q.push(3);
1601 /// q.push(4);
1602 ///
1603 /// let w = Worker::new_fifo();
1604 /// assert_eq!(q.steal_batch_and_pop(&w), Steal::Success(1));
1605 /// assert_eq!(w.pop(), Some(2));
1606 /// ```
1607 pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
1608 let mut head;
1609 let mut block;
1610 let mut offset;
1611
1612 let backoff = Backoff::new();
1613 loop {
1614 head = self.head.index.load(Ordering::Acquire);
1615 block = self.head.block.load(Ordering::Acquire);
1616
1617 // Calculate the offset of the index into the block.
1618 offset = (head >> SHIFT) % LAP;
1619
1620 // If we reached the end of the block, wait until the next one is installed.
1621 if offset == BLOCK_CAP {
1622 backoff.snooze();
1623 } else {
1624 break;
1625 }
1626 }
1627
1628 let mut new_head = head;
1629 let advance;
1630
1631 if new_head & HAS_NEXT == 0 {
1632 atomic::fence(Ordering::SeqCst);
1633 let tail = self.tail.index.load(Ordering::Relaxed);
1634
1635 // If the tail equals the head, that means the queue is empty.
1636 if head >> SHIFT == tail >> SHIFT {
1637 return Steal::Empty;
1638 }
1639
1640 // If head and tail are not in the same block, set `HAS_NEXT` in head.
1641 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1642 new_head |= HAS_NEXT;
1643 // We can steal all tasks till the end of the block.
1644 advance = (BLOCK_CAP - offset).min(MAX_BATCH + 1);
1645 } else {
1646 let len = (tail - head) >> SHIFT;
1647 // Steal half of the available tasks.
1648 advance = ((len + 1) / 2).min(MAX_BATCH + 1);
1649 }
1650 } else {
1651 // We can steal all tasks till the end of the block.
1652 advance = (BLOCK_CAP - offset).min(MAX_BATCH + 1);
1653 }
1654
1655 new_head += advance << SHIFT;
1656 let new_offset = offset + advance;
1657
1658 // Try moving the head index forward.
1659 if self
1660 .head
1661 .index
1662 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1663 .is_err()
1664 {
1665 return Steal::Retry;
1666 }
1667
1668 // Reserve capacity for the stolen batch.
1669 let batch_size = new_offset - offset - 1;
1670 dest.reserve(batch_size);
1671
1672 // Get the destination buffer and back index.
1673 let dest_buffer = dest.buffer.get();
1674 let dest_b = dest.inner.back.load(Ordering::Relaxed);
1675
1676 unsafe {
1677 // If we've reached the end of the block, move to the next one.
1678 if new_offset == BLOCK_CAP {
1679 let next = (*block).wait_next();
1680 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1681 if !(*next).next.load(Ordering::Relaxed).is_null() {
1682 next_index |= HAS_NEXT;
1683 }
1684
1685 self.head.block.store(next, Ordering::Release);
1686 self.head.index.store(next_index, Ordering::Release);
1687 }
1688
1689 // Read the task.
1690 let slot = (*block).slots.get_unchecked(offset);
1691 slot.wait_write();
1692 let task = slot.task.get().read().assume_init();
1693
1694 match dest.flavor {
1695 Flavor::Fifo => {
1696 // Copy values from the injector into the destination queue.
1697 for i in 0..batch_size {
1698 // Read the task.
1699 let slot = (*block).slots.get_unchecked(offset + i + 1);
1700 slot.wait_write();
1701 let task = slot.task.get().read().assume_init();
1702
1703 // Write it into the destination queue.
1704 dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1705 }
1706 }
1707
1708 Flavor::Lifo => {
1709 // Copy values from the injector into the destination queue.
1710 for i in 0..batch_size {
1711 // Read the task.
1712 let slot = (*block).slots.get_unchecked(offset + i + 1);
1713 slot.wait_write();
1714 let task = slot.task.get().read().assume_init();
1715
1716 // Write it into the destination queue.
1717 dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1718 }
1719 }
1720 }
1721
1722 atomic::fence(Ordering::Release);
1723
1724 // Update the back index in the destination queue.
1725 //
1726 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1727 // data races because it doesn't understand fences.
1728 dest.inner
1729 .back
1730 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1731
1732 // Destroy the block if we've reached the end, or if another thread wanted to destroy
1733 // but couldn't because we were busy reading from the slot.
1734 if new_offset == BLOCK_CAP {
1735 Block::destroy(block, offset);
1736 } else {
1737 for i in offset..new_offset {
1738 let slot = (*block).slots.get_unchecked(i);
1739
1740 if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1741 Block::destroy(block, offset);
1742 break;
1743 }
1744 }
1745 }
1746
1747 Steal::Success(task)
1748 }
1749 }
1750
1751 /// Returns `true` if the queue is empty.
1752 ///
1753 /// # Examples
1754 ///
1755 /// ```
1756 /// use crossbeam_deque::Injector;
1757 ///
1758 /// let q = Injector::new();
1759 ///
1760 /// assert!(q.is_empty());
1761 /// q.push(1);
1762 /// assert!(!q.is_empty());
1763 /// ```
1764 pub fn is_empty(&self) -> bool {
1765 let head = self.head.index.load(Ordering::SeqCst);
1766 let tail = self.tail.index.load(Ordering::SeqCst);
1767 head >> SHIFT == tail >> SHIFT
1768 }
1769
1770 /// Returns the number of tasks in the queue.
1771 ///
1772 /// # Examples
1773 ///
1774 /// ```
1775 /// use crossbeam_deque::Injector;
1776 ///
1777 /// let q = Injector::new();
1778 ///
1779 /// assert_eq!(q.len(), 0);
1780 /// q.push(1);
1781 /// assert_eq!(q.len(), 1);
1782 /// q.push(1);
1783 /// assert_eq!(q.len(), 2);
1784 /// ```
1785 pub fn len(&self) -> usize {
1786 loop {
1787 // Load the tail index, then load the head index.
1788 let mut tail = self.tail.index.load(Ordering::SeqCst);
1789 let mut head = self.head.index.load(Ordering::SeqCst);
1790
1791 // If the tail index didn't change, we've got consistent indices to work with.
1792 if self.tail.index.load(Ordering::SeqCst) == tail {
1793 // Erase the lower bits.
1794 tail &= !((1 << SHIFT) - 1);
1795 head &= !((1 << SHIFT) - 1);
1796
1797 // Fix up indices if they fall onto block ends.
1798 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
1799 tail = tail.wrapping_add(1 << SHIFT);
1800 }
1801 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
1802 head = head.wrapping_add(1 << SHIFT);
1803 }
1804
1805 // Rotate indices so that head falls into the first block.
1806 let lap = (head >> SHIFT) / LAP;
1807 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
1808 head = head.wrapping_sub((lap * LAP) << SHIFT);
1809
1810 // Remove the lower bits.
1811 tail >>= SHIFT;
1812 head >>= SHIFT;
1813
1814 // Return the difference minus the number of blocks between tail and head.
1815 return tail - head - tail / LAP;
1816 }
1817 }
1818 }
1819}
1820
1821impl<T> Drop for Injector<T> {
1822 fn drop(&mut self) {
1823 let mut head = self.head.index.load(Ordering::Relaxed);
1824 let mut tail = self.tail.index.load(Ordering::Relaxed);
1825 let mut block = self.head.block.load(Ordering::Relaxed);
1826
1827 // Erase the lower bits.
1828 head &= !((1 << SHIFT) - 1);
1829 tail &= !((1 << SHIFT) - 1);
1830
1831 unsafe {
1832 // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
1833 while head != tail {
1834 let offset = (head >> SHIFT) % LAP;
1835
1836 if offset < BLOCK_CAP {
1837 // Drop the task in the slot.
1838 let slot = (*block).slots.get_unchecked(offset);
1839 let p = &mut *slot.task.get();
1840 p.as_mut_ptr().drop_in_place();
1841 } else {
1842 // Deallocate the block and move to the next one.
1843 let next = (*block).next.load(Ordering::Relaxed);
1844 drop(Box::from_raw(block));
1845 block = next;
1846 }
1847
1848 head = head.wrapping_add(1 << SHIFT);
1849 }
1850
1851 // Deallocate the last remaining block.
1852 drop(Box::from_raw(block));
1853 }
1854 }
1855}
1856
1857impl<T> fmt::Debug for Injector<T> {
1858 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1859 f.pad("Worker { .. }")
1860 }
1861}
1862
1863/// Possible outcomes of a steal operation.
1864///
1865/// # Examples
1866///
1867/// There are lots of ways to chain results of steal operations together:
1868///
1869/// ```
1870/// use crossbeam_deque::Steal::{self, Empty, Retry, Success};
1871///
1872/// let collect = |v: Vec<Steal<i32>>| v.into_iter().collect::<Steal<i32>>();
1873///
1874/// assert_eq!(collect(vec![Empty, Empty, Empty]), Empty);
1875/// assert_eq!(collect(vec![Empty, Retry, Empty]), Retry);
1876/// assert_eq!(collect(vec![Retry, Success(1), Empty]), Success(1));
1877///
1878/// assert_eq!(collect(vec![Empty, Empty]).or_else(|| Retry), Retry);
1879/// assert_eq!(collect(vec![Retry, Empty]).or_else(|| Success(1)), Success(1));
1880/// ```
1881#[must_use]
1882#[derive(PartialEq, Eq, Copy, Clone)]
1883pub enum Steal<T> {
1884 /// The queue was empty at the time of stealing.
1885 Empty,
1886
1887 /// At least one task was successfully stolen.
1888 Success(T),
1889
1890 /// The steal operation needs to be retried.
1891 Retry,
1892}
1893
1894impl<T> Steal<T> {
1895 /// Returns `true` if the queue was empty at the time of stealing.
1896 ///
1897 /// # Examples
1898 ///
1899 /// ```
1900 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
1901 ///
1902 /// assert!(!Success(7).is_empty());
1903 /// assert!(!Retry::<i32>.is_empty());
1904 ///
1905 /// assert!(Empty::<i32>.is_empty());
1906 /// ```
1907 pub fn is_empty(&self) -> bool {
1908 match self {
1909 Steal::Empty => true,
1910 _ => false,
1911 }
1912 }
1913
1914 /// Returns `true` if at least one task was stolen.
1915 ///
1916 /// # Examples
1917 ///
1918 /// ```
1919 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
1920 ///
1921 /// assert!(!Empty::<i32>.is_success());
1922 /// assert!(!Retry::<i32>.is_success());
1923 ///
1924 /// assert!(Success(7).is_success());
1925 /// ```
1926 pub fn is_success(&self) -> bool {
1927 match self {
1928 Steal::Success(_) => true,
1929 _ => false,
1930 }
1931 }
1932
1933 /// Returns `true` if the steal operation needs to be retried.
1934 ///
1935 /// # Examples
1936 ///
1937 /// ```
1938 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
1939 ///
1940 /// assert!(!Empty::<i32>.is_retry());
1941 /// assert!(!Success(7).is_retry());
1942 ///
1943 /// assert!(Retry::<i32>.is_retry());
1944 /// ```
1945 pub fn is_retry(&self) -> bool {
1946 match self {
1947 Steal::Retry => true,
1948 _ => false,
1949 }
1950 }
1951
1952 /// Returns the result of the operation, if successful.
1953 ///
1954 /// # Examples
1955 ///
1956 /// ```
1957 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
1958 ///
1959 /// assert_eq!(Empty::<i32>.success(), None);
1960 /// assert_eq!(Retry::<i32>.success(), None);
1961 ///
1962 /// assert_eq!(Success(7).success(), Some(7));
1963 /// ```
1964 pub fn success(self) -> Option<T> {
1965 match self {
1966 Steal::Success(res) => Some(res),
1967 _ => None,
1968 }
1969 }
1970
1971 /// If no task was stolen, attempts another steal operation.
1972 ///
1973 /// Returns this steal result if it is `Success`. Otherwise, closure `f` is invoked and then:
1974 ///
1975 /// * If the second steal resulted in `Success`, it is returned.
1976 /// * If both steals were unsuccessful but any resulted in `Retry`, then `Retry` is returned.
1977 /// * If both resulted in `None`, then `None` is returned.
1978 ///
1979 /// # Examples
1980 ///
1981 /// ```
1982 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
1983 ///
1984 /// assert_eq!(Success(1).or_else(|| Success(2)), Success(1));
1985 /// assert_eq!(Retry.or_else(|| Success(2)), Success(2));
1986 ///
1987 /// assert_eq!(Retry.or_else(|| Empty), Retry::<i32>);
1988 /// assert_eq!(Empty.or_else(|| Retry), Retry::<i32>);
1989 ///
1990 /// assert_eq!(Empty.or_else(|| Empty), Empty::<i32>);
1991 /// ```
1992 pub fn or_else<F>(self, f: F) -> Steal<T>
1993 where
1994 F: FnOnce() -> Steal<T>,
1995 {
1996 match self {
1997 Steal::Empty => f(),
1998 Steal::Success(_) => self,
1999 Steal::Retry => {
2000 if let Steal::Success(res) = f() {
2001 Steal::Success(res)
2002 } else {
2003 Steal::Retry
2004 }
2005 }
2006 }
2007 }
2008}
2009
2010impl<T> fmt::Debug for Steal<T> {
2011 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2012 match self {
2013 Steal::Empty => f.pad("Empty"),
2014 Steal::Success(_) => f.pad("Success(..)"),
2015 Steal::Retry => f.pad("Retry"),
2016 }
2017 }
2018}
2019
2020impl<T> FromIterator<Steal<T>> for Steal<T> {
2021 /// Consumes items until a `Success` is found and returns it.
2022 ///
2023 /// If no `Success` was found, but there was at least one `Retry`, then returns `Retry`.
2024 /// Otherwise, `Empty` is returned.
2025 fn from_iter<I>(iter: I) -> Steal<T>
2026 where
2027 I: IntoIterator<Item = Steal<T>>,
2028 {
2029 let mut retry = false;
2030 for s in iter {
2031 match &s {
2032 Steal::Empty => {}
2033 Steal::Success(_) => return s,
2034 Steal::Retry => retry = true,
2035 }
2036 }
2037
2038 if retry {
2039 Steal::Retry
2040 } else {
2041 Steal::Empty
2042 }
2043 }
2044}