blob: 3f74d1bfb9fbdc074de05e73b2cc628c9f267c0c [file] [log] [blame]
Jakub Kotur5dd645c2020-12-21 17:28:14 +01001use std::sync::atomic::Ordering::SeqCst;
2use std::sync::atomic::{AtomicBool, AtomicUsize};
3use std::sync::{Arc, Mutex};
4
5use crossbeam_deque::Steal::{Empty, Success};
6use crossbeam_deque::{Injector, Worker};
7use crossbeam_utils::thread::scope;
8use rand::Rng;
9
10#[test]
11fn smoke() {
12 let q = Injector::new();
13 assert_eq!(q.steal(), Empty);
14
15 q.push(1);
16 q.push(2);
17 assert_eq!(q.steal(), Success(1));
18 assert_eq!(q.steal(), Success(2));
19 assert_eq!(q.steal(), Empty);
20
21 q.push(3);
22 assert_eq!(q.steal(), Success(3));
23 assert_eq!(q.steal(), Empty);
24}
25
26#[test]
27fn is_empty() {
28 let q = Injector::new();
29 assert!(q.is_empty());
30
31 q.push(1);
32 assert!(!q.is_empty());
33 q.push(2);
34 assert!(!q.is_empty());
35
36 let _ = q.steal();
37 assert!(!q.is_empty());
38 let _ = q.steal();
39 assert!(q.is_empty());
40
41 q.push(3);
42 assert!(!q.is_empty());
43 let _ = q.steal();
44 assert!(q.is_empty());
45}
46
47#[test]
48fn spsc() {
49 const COUNT: usize = 100_000;
50
51 let q = Injector::new();
52
53 scope(|scope| {
54 scope.spawn(|_| {
55 for i in 0..COUNT {
56 loop {
57 if let Success(v) = q.steal() {
58 assert_eq!(i, v);
59 break;
60 }
61 }
62 }
63
64 assert_eq!(q.steal(), Empty);
65 });
66
67 for i in 0..COUNT {
68 q.push(i);
69 }
70 })
71 .unwrap();
72}
73
74#[test]
75fn mpmc() {
76 const COUNT: usize = 25_000;
77 const THREADS: usize = 4;
78
79 let q = Injector::new();
80 let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
81
82 scope(|scope| {
83 for _ in 0..THREADS {
84 scope.spawn(|_| {
85 for i in 0..COUNT {
86 q.push(i);
87 }
88 });
89 }
90
91 for _ in 0..THREADS {
92 scope.spawn(|_| {
93 for _ in 0..COUNT {
94 loop {
95 if let Success(n) = q.steal() {
96 v[n].fetch_add(1, SeqCst);
97 break;
98 }
99 }
100 }
101 });
102 }
103 })
104 .unwrap();
105
106 for c in v {
107 assert_eq!(c.load(SeqCst), THREADS);
108 }
109}
110
111#[test]
112fn stampede() {
113 const THREADS: usize = 8;
114 const COUNT: usize = 50_000;
115
116 let q = Injector::new();
117
118 for i in 0..COUNT {
119 q.push(Box::new(i + 1));
120 }
121 let remaining = Arc::new(AtomicUsize::new(COUNT));
122
123 scope(|scope| {
124 for _ in 0..THREADS {
125 let remaining = remaining.clone();
126 let q = &q;
127
128 scope.spawn(move |_| {
129 let mut last = 0;
130 while remaining.load(SeqCst) > 0 {
131 if let Success(x) = q.steal() {
132 assert!(last < *x);
133 last = *x;
134 remaining.fetch_sub(1, SeqCst);
135 }
136 }
137 });
138 }
139
140 let mut last = 0;
141 while remaining.load(SeqCst) > 0 {
142 if let Success(x) = q.steal() {
143 assert!(last < *x);
144 last = *x;
145 remaining.fetch_sub(1, SeqCst);
146 }
147 }
148 })
149 .unwrap();
150}
151
152#[test]
153fn stress() {
154 const THREADS: usize = 8;
155 const COUNT: usize = 50_000;
156
157 let q = Injector::new();
158 let done = Arc::new(AtomicBool::new(false));
159 let hits = Arc::new(AtomicUsize::new(0));
160
161 scope(|scope| {
162 for _ in 0..THREADS {
163 let done = done.clone();
164 let hits = hits.clone();
165 let q = &q;
166
167 scope.spawn(move |_| {
168 let w2 = Worker::new_fifo();
169
170 while !done.load(SeqCst) {
171 if let Success(_) = q.steal() {
172 hits.fetch_add(1, SeqCst);
173 }
174
175 let _ = q.steal_batch(&w2);
176
177 if let Success(_) = q.steal_batch_and_pop(&w2) {
178 hits.fetch_add(1, SeqCst);
179 }
180
Joel Galensonf0b17732021-08-09 10:27:52 -0700181 while w2.pop().is_some() {
Jakub Kotur5dd645c2020-12-21 17:28:14 +0100182 hits.fetch_add(1, SeqCst);
183 }
184 }
185 });
186 }
187
188 let mut rng = rand::thread_rng();
189 let mut expected = 0;
190 while expected < COUNT {
Joel Galensonf0b17732021-08-09 10:27:52 -0700191 if rng.gen_range(0..3) == 0 {
Jakub Kotur5dd645c2020-12-21 17:28:14 +0100192 while let Success(_) = q.steal() {
193 hits.fetch_add(1, SeqCst);
194 }
195 } else {
196 q.push(expected);
197 expected += 1;
198 }
199 }
200
201 while hits.load(SeqCst) < COUNT {
202 while let Success(_) = q.steal() {
203 hits.fetch_add(1, SeqCst);
204 }
205 }
206 done.store(true, SeqCst);
207 })
208 .unwrap();
209}
210
211#[test]
212fn no_starvation() {
213 const THREADS: usize = 8;
214 const COUNT: usize = 50_000;
215
216 let q = Injector::new();
217 let done = Arc::new(AtomicBool::new(false));
218 let mut all_hits = Vec::new();
219
220 scope(|scope| {
221 for _ in 0..THREADS {
222 let done = done.clone();
223 let hits = Arc::new(AtomicUsize::new(0));
224 all_hits.push(hits.clone());
225 let q = &q;
226
227 scope.spawn(move |_| {
228 let w2 = Worker::new_fifo();
229
230 while !done.load(SeqCst) {
231 if let Success(_) = q.steal() {
232 hits.fetch_add(1, SeqCst);
233 }
234
235 let _ = q.steal_batch(&w2);
236
237 if let Success(_) = q.steal_batch_and_pop(&w2) {
238 hits.fetch_add(1, SeqCst);
239 }
240
Joel Galensonf0b17732021-08-09 10:27:52 -0700241 while w2.pop().is_some() {
Jakub Kotur5dd645c2020-12-21 17:28:14 +0100242 hits.fetch_add(1, SeqCst);
243 }
244 }
245 });
246 }
247
248 let mut rng = rand::thread_rng();
249 let mut my_hits = 0;
250 loop {
Joel Galensonf0b17732021-08-09 10:27:52 -0700251 for i in 0..rng.gen_range(0..COUNT) {
252 if rng.gen_range(0..3) == 0 && my_hits == 0 {
Jakub Kotur5dd645c2020-12-21 17:28:14 +0100253 while let Success(_) = q.steal() {
254 my_hits += 1;
255 }
256 } else {
257 q.push(i);
258 }
259 }
260
261 if my_hits > 0 && all_hits.iter().all(|h| h.load(SeqCst) > 0) {
262 break;
263 }
264 }
265 done.store(true, SeqCst);
266 })
267 .unwrap();
268}
269
270#[test]
271fn destructors() {
272 const THREADS: usize = 8;
273 const COUNT: usize = 50_000;
274 const STEPS: usize = 1000;
275
276 struct Elem(usize, Arc<Mutex<Vec<usize>>>);
277
278 impl Drop for Elem {
279 fn drop(&mut self) {
280 self.1.lock().unwrap().push(self.0);
281 }
282 }
283
284 let q = Injector::new();
285 let dropped = Arc::new(Mutex::new(Vec::new()));
286 let remaining = Arc::new(AtomicUsize::new(COUNT));
287
288 for i in 0..COUNT {
289 q.push(Elem(i, dropped.clone()));
290 }
291
292 scope(|scope| {
293 for _ in 0..THREADS {
294 let remaining = remaining.clone();
295 let q = &q;
296
297 scope.spawn(move |_| {
298 let w2 = Worker::new_fifo();
299 let mut cnt = 0;
300
301 while cnt < STEPS {
302 if let Success(_) = q.steal() {
303 cnt += 1;
304 remaining.fetch_sub(1, SeqCst);
305 }
306
307 let _ = q.steal_batch(&w2);
308
309 if let Success(_) = q.steal_batch_and_pop(&w2) {
310 cnt += 1;
311 remaining.fetch_sub(1, SeqCst);
312 }
313
Joel Galensonf0b17732021-08-09 10:27:52 -0700314 while w2.pop().is_some() {
Jakub Kotur5dd645c2020-12-21 17:28:14 +0100315 cnt += 1;
316 remaining.fetch_sub(1, SeqCst);
317 }
318 }
319 });
320 }
321
322 for _ in 0..STEPS {
323 if let Success(_) = q.steal() {
324 remaining.fetch_sub(1, SeqCst);
325 }
326 }
327 })
328 .unwrap();
329
330 let rem = remaining.load(SeqCst);
331 assert!(rem > 0);
332
333 {
334 let mut v = dropped.lock().unwrap();
335 assert_eq!(v.len(), COUNT - rem);
336 v.clear();
337 }
338
339 drop(q);
340
341 {
342 let mut v = dropped.lock().unwrap();
343 assert_eq!(v.len(), rem);
344 v.sort();
345 for pair in v.windows(2) {
346 assert_eq!(pair[0] + 1, pair[1]);
347 }
348 }
349}