0 | |
--- a/Cargo.toml
|
1 | |
+++ b/Cargo.toml
|
2 | |
@@ -25,3 +25,3 @@ repository = "https://github.com/rayon-r
|
3 | |
[dependencies.crossbeam-deque]
|
4 | |
-version = "0.2.0"
|
5 | |
+version = "0.6.1"
|
6 | |
|
7 | |
--- a/src/iter/par_bridge.rs
|
8 | |
+++ b/src/iter/par_bridge.rs
|
9 | |
@@ -1,4 +1,4 @@
|
10 | |
-use crossbeam_deque::{Deque, Stealer, Steal};
|
11 | |
+use crossbeam_deque::{self as deque, Worker, Stealer, Steal};
|
12 | |
|
13 | |
use std::thread::yield_now;
|
14 | |
use std::sync::{Mutex, TryLockError};
|
15 | |
@@ -78,10 +78,9 @@ impl<Iter: Iterator + Send> ParallelIterator for IterBridge<Iter>
|
16 | |
where C: UnindexedConsumer<Self::Item>
|
17 | |
{
|
18 | |
let split_count = AtomicUsize::new(current_num_threads());
|
19 | |
- let deque = Deque::new();
|
20 | |
- let stealer = deque.stealer();
|
21 | |
+ let (worker, stealer) = deque::fifo();
|
22 | |
let done = AtomicBool::new(false);
|
23 | |
- let iter = Mutex::new((self.iter, deque));
|
24 | |
+ let iter = Mutex::new((self.iter, worker));
|
25 | |
|
26 | |
bridge_unindexed(IterParallelProducer {
|
27 | |
split_count: &split_count,
|
28 | |
@@ -95,7 +94,7 @@ impl<Iter: Iterator + Send> ParallelIterator for IterBridge<Iter>
|
29 | |
struct IterParallelProducer<'a, Iter: Iterator + 'a> {
|
30 | |
split_count: &'a AtomicUsize,
|
31 | |
done: &'a AtomicBool,
|
32 | |
- iter: &'a Mutex<(Iter, Deque<Iter::Item>)>,
|
33 | |
+ iter: &'a Mutex<(Iter, Worker<Iter::Item>)>,
|
34 | |
items: Stealer<Iter::Item>,
|
35 | |
}
|
36 | |
|
37 | |
@@ -159,11 +158,15 @@ impl<'a, Iter: Iterator + Send + 'a> UnindexedProducer for IterParallelProducer<
|
38 | |
let count = current_num_threads();
|
39 | |
let count = (count * count) * 2;
|
40 | |
|
41 | |
- let (ref mut iter, ref deque) = *guard;
|
42 | |
+ let (ref mut iter, ref worker) = *guard;
|
43 | |
|
44 | |
- while deque.len() < count {
|
45 | |
+ // while worker.len() < count {
|
46 | |
+ // TODO the new deque doesn't let us count items. We can just
|
47 | |
+ // push a number of items, but that doesn't consider active
|
48 | |
+ // stealers elsewhere.
|
49 | |
+ for _ in 0..count {
|
50 | |
if let Some(it) = iter.next() {
|
51 | |
- deque.push(it);
|
52 | |
+ worker.push(it);
|
53 | |
} else {
|
54 | |
self.done.store(true, Ordering::SeqCst);
|
55 | |
break;
|