0 | |
From 98ad0f42db9039b508e4fbbb612d4ac5d3f020f4 Mon Sep 17 00:00:00 2001
|
1 | |
From: Josh Stone <cuviper@gmail.com>
|
2 | |
Date: Mon, 10 Sep 2018 16:27:21 -0700
|
3 | |
Subject: [PATCH] Port to crossbeam-deque 0.6
|
4 | |
|
5 | |
---
|
6 | |
Cargo.toml | 2 +-
|
7 | |
rayon-core/Cargo.toml | 4 +---
|
8 | |
rayon-core/src/registry.rs | 47 +++++++++++++++-----------------------
|
9 | |
src/iter/par_bridge.rs | 19 ++++++++-------
|
10 | |
4 files changed, 31 insertions(+), 41 deletions(-)
|
11 | |
|
12 | |
diff --git a/Cargo.toml b/Cargo.toml
|
13 | |
index 14230aa2..97b0ba53 100644
|
14 | |
--- a/Cargo.toml
|
15 | |
+++ b/Cargo.toml
|
16 | |
@@ -47,3 +47,3 @@ path = "tests/simple_panic.rs"
|
17 | |
[dependencies.crossbeam-deque]
|
18 | |
-version = "0.2.0"
|
19 | |
+version = "0.6.0"
|
20 | |
|
21 | |
diff --git a/src/registry.rs b/src/registry.rs
|
22 | |
index 0fb8e203..16f3f01e 100644
|
23 | |
--- a/src/registry.rs
|
24 | |
+++ b/src/registry.rs
|
25 | |
@@ -1,5 +1,5 @@
|
26 | |
use ::{ExitHandler, PanicHandler, StartHandler, ThreadPoolBuilder, ThreadPoolBuildError, ErrorKind};
|
27 | |
-use crossbeam_deque::{Deque, Steal, Stealer};
|
28 | |
+use crossbeam_deque::{self as deque, Worker, Pop, Steal, Stealer};
|
29 | |
use job::{JobRef, StackJob};
|
30 | |
#[cfg(rayon_unstable)]
|
31 | |
use job::Job;
|
32 | |
@@ -46,7 +46,7 @@ pub struct Registry {
|
33 | |
}
|
34 | |
|
35 | |
struct RegistryState {
|
36 | |
- job_injector: Deque<JobRef>,
|
37 | |
+ job_injector: Worker<JobRef>,
|
38 | |
}
|
39 | |
|
40 | |
/// ////////////////////////////////////////////////////////////////////////
|
41 | |
@@ -100,12 +100,10 @@ impl Registry {
|
42 | |
let n_threads = builder.get_num_threads();
|
43 | |
let breadth_first = builder.get_breadth_first();
|
44 | |
|
45 | |
- let inj_worker = Deque::new();
|
46 | |
- let inj_stealer = inj_worker.stealer();
|
47 | |
- let workers: Vec<_> = (0..n_threads)
|
48 | |
- .map(|_| Deque::new())
|
49 | |
- .collect();
|
50 | |
- let stealers: Vec<_> = workers.iter().map(|d| d.stealer()).collect();
|
51 | |
+ let (inj_worker, inj_stealer) = deque::fifo();
|
52 | |
+ let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads)
|
53 | |
+ .map(|_| if breadth_first { deque::fifo() } else { deque::lifo() })
|
54 | |
+ .unzip();
|
55 | |
|
56 | |
let registry = Arc::new(Registry {
|
57 | |
thread_infos: stealers.into_iter()
|
58 | |
@@ -132,7 +130,7 @@ impl Registry {
|
59 | |
if let Some(stack_size) = builder.get_stack_size() {
|
60 | |
b = b.stack_size(stack_size);
|
61 | |
}
|
62 | |
- if let Err(e) = b.spawn(move || unsafe { main_loop(worker, registry, index, breadth_first) }) {
|
63 | |
+ if let Err(e) = b.spawn(move || unsafe { main_loop(worker, registry, index) }) {
|
64 | |
return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e)))
|
65 | |
}
|
66 | |
}
|
67 | |
@@ -417,7 +415,7 @@ pub struct RegistryId {
|
68 | |
}
|
69 | |
|
70 | |
impl RegistryState {
|
71 | |
- pub fn new(job_injector: Deque<JobRef>) -> RegistryState {
|
72 | |
+ pub fn new(job_injector: Worker<JobRef>) -> RegistryState {
|
73 | |
RegistryState {
|
74 | |
job_injector: job_injector,
|
75 | |
}
|
76 | |
@@ -453,13 +451,10 @@ impl ThreadInfo {
|
77 | |
|
78 | |
pub struct WorkerThread {
|
79 | |
/// the "worker" half of our local deque
|
80 | |
- worker: Deque<JobRef>,
|
81 | |
+ worker: Worker<JobRef>,
|
82 | |
|
83 | |
index: usize,
|
84 | |
|
85 | |
- /// are these workers configured to steal breadth-first or not?
|
86 | |
- breadth_first: bool,
|
87 | |
-
|
88 | |
/// A weak random number generator.
|
89 | |
rng: XorShift64Star,
|
90 | |
|
91 | |
@@ -513,7 +508,7 @@ impl WorkerThread {
|
92 | |
|
93 | |
#[inline]
|
94 | |
pub fn local_deque_is_empty(&self) -> bool {
|
95 | |
- self.worker.len() == 0
|
96 | |
+ self.worker.is_empty()
|
97 | |
}
|
98 | |
|
99 | |
/// Attempts to obtain a "local" job -- typically this means
|
100 | |
@@ -522,15 +517,11 @@ impl WorkerThread {
|
101 | |
/// bottom.
|
102 | |
#[inline]
|
103 | |
pub unsafe fn take_local_job(&self) -> Option<JobRef> {
|
104 | |
- if !self.breadth_first {
|
105 | |
- self.worker.pop()
|
106 | |
- } else {
|
107 | |
- loop {
|
108 | |
- match self.worker.steal() {
|
109 | |
- Steal::Empty => return None,
|
110 | |
- Steal::Data(d) => return Some(d),
|
111 | |
- Steal::Retry => {},
|
112 | |
- }
|
113 | |
+ loop {
|
114 | |
+ match self.worker.pop() {
|
115 | |
+ Pop::Empty => return None,
|
116 | |
+ Pop::Data(d) => return Some(d),
|
117 | |
+ Pop::Retry => {},
|
118 | |
}
|
119 | |
}
|
120 | |
}
|
121 | |
@@ -596,7 +587,7 @@ impl WorkerThread {
|
122 | |
/// local work to do.
|
123 | |
unsafe fn steal(&self) -> Option<JobRef> {
|
124 | |
// we only steal when we don't have any work to do locally
|
125 | |
- debug_assert!(self.worker.pop().is_none());
|
126 | |
+ debug_assert!(self.local_deque_is_empty());
|
127 | |
|
128 | |
// otherwise, try to steal
|
129 | |
let num_threads = self.registry.thread_infos.len();
|
130 | |
@@ -630,13 +621,11 @@ impl WorkerThread {
|
131 | |
|
132 | |
/// ////////////////////////////////////////////////////////////////////////
|
133 | |
|
134 | |
-unsafe fn main_loop(worker: Deque<JobRef>,
|
135 | |
+unsafe fn main_loop(worker: Worker<JobRef>,
|
136 | |
registry: Arc<Registry>,
|
137 | |
- index: usize,
|
138 | |
- breadth_first: bool) {
|
139 | |
+ index: usize) {
|
140 | |
let worker_thread = WorkerThread {
|
141 | |
worker: worker,
|
142 | |
- breadth_first: breadth_first,
|
143 | |
index: index,
|
144 | |
rng: XorShift64Star::new(),
|
145 | |
registry: registry.clone(),
|