-
Notifications
You must be signed in to change notification settings - Fork 312
/
weak_memory.rs
266 lines (240 loc) · 10.5 KB
/
weak_memory.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
use std::{
cell::{Ref, RefCell, RefMut},
collections::VecDeque,
};
use rustc_const_eval::interpret::{AllocRange, InterpResult, ScalarMaybeUninit};
use rustc_data_structures::fx::FxHashMap;
use rustc_target::abi::Size;
use crate::{
data_race::{GlobalState, ThreadClockSet},
RangeMap, Tag, VClock, VTimestamp, VectorIdx,
};
pub type AllocExtra = StoreBufferAlloc;
#[derive(Debug, Clone)]
pub struct StoreBufferAlloc {
/// Store buffer of each atomic object in this allocation
store_buffer: RefCell<RangeMap<StoreBuffer>>,
}
impl StoreBufferAlloc {
pub fn new_allocation(len: Size) -> Self {
Self { store_buffer: RefCell::new(RangeMap::new(len, StoreBuffer::default())) }
}
/// Gets a store buffer associated with an atomic object in this allocation
fn get_store_buffer(&self, range: AllocRange) -> Ref<'_, StoreBuffer> {
Ref::map(self.store_buffer.borrow(), |range_map| {
let (.., store_buffer) = range_map.iter(range.start, range.size).next().unwrap();
store_buffer
})
}
fn get_store_buffer_mut(&self, range: AllocRange) -> RefMut<'_, StoreBuffer> {
RefMut::map(self.store_buffer.borrow_mut(), |range_map| {
let (.., store_buffer) = range_map.iter_mut(range.start, range.size).next().unwrap();
store_buffer
})
}
/// Reads from the last store in modification order
pub fn read_from_last_store<'tcx>(&self, range: AllocRange, global: &GlobalState) {
let store_buffer = self.get_store_buffer(range);
let store_elem = store_buffer.buffer.back();
if let Some(store_elem) = store_elem {
let (index, clocks) = global.current_thread_state();
store_elem.load_impl(index, &clocks);
}
}
pub fn buffered_read<'tcx>(
&self,
range: AllocRange,
global: &GlobalState,
is_seqcst: bool,
rng: &mut (impl rand::Rng + ?Sized),
validate: impl FnOnce() -> InterpResult<'tcx>,
) -> InterpResult<'tcx, Option<ScalarMaybeUninit<Tag>>> {
// Having a live borrow to store_buffer while calling validate_atomic_load is fine
// because the race detector doesn't touch store_buffer
let store_buffer = self.get_store_buffer(range);
let store_elem = {
// The `clocks` we got here must be dropped before calling validate_atomic_load
// as the race detector will update it
let (.., clocks) = global.current_thread_state();
// Load from a valid entry in the store buffer
store_buffer.fetch_store(is_seqcst, &clocks, &mut *rng)
};
// Unlike in write_scalar_atomic, thread clock updates have to be done
// after we've picked a store element from the store buffer, as presented
// in ATOMIC LOAD rule of the paper. This is because fetch_store
// require access to ThreadClockSet.clock, which is updated by the race detector
validate()?;
let loaded = store_elem.map(|store_elem| {
let (index, clocks) = global.current_thread_state();
store_elem.load_impl(index, &clocks)
});
Ok(loaded)
}
pub fn buffered_write<'tcx>(
&mut self,
val: ScalarMaybeUninit<Tag>,
range: AllocRange,
global: &GlobalState,
is_seqcst: bool,
) -> InterpResult<'tcx> {
let (index, clocks) = global.current_thread_state();
let mut store_buffer = self.get_store_buffer_mut(range);
store_buffer.store_impl(val, index, &clocks.clock, is_seqcst);
Ok(())
}
}
const STORE_BUFFER_LIMIT: usize = 128;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StoreBuffer {
// Stores to this location in modification order
buffer: VecDeque<StoreElement>,
}
impl Default for StoreBuffer {
fn default() -> Self {
let mut buffer = VecDeque::new();
buffer.reserve(STORE_BUFFER_LIMIT);
Self { buffer }
}
}
impl<'mir, 'tcx: 'mir> StoreBuffer {
/// Selects a valid store element in the buffer.
/// The buffer does not contain the value used to initialise the atomic object
/// so a fresh atomic object has an empty store buffer until an explicit store.
fn fetch_store<R: rand::Rng + ?Sized>(
&self,
is_seqcst: bool,
clocks: &ThreadClockSet,
rng: &mut R,
) -> Option<&StoreElement> {
use rand::seq::IteratorRandom;
let mut found_sc = false;
// FIXME: this should be an inclusive take_while (stops after a false predicate, but
// includes the element that gave the false), but such function doesn't yet
// exist in the standard libary https://github.com/rust-lang/rust/issues/62208
let mut keep_searching = true;
let candidates = self
.buffer
.iter()
.rev()
.take_while(move |&store_elem| {
if !keep_searching {
return false;
}
// CoWR: if a store in modification order happens-before the current load,
// then we can't read-from anything earlier in modification order.
if store_elem.timestamp <= clocks.clock[store_elem.store_index] {
log::info!("Stopped due to coherent write-read");
keep_searching = false;
return true;
}
// CoRR: if there was a load from this store which happened-before the current load,
// then we cannot read-from anything earlier in modification order.
if store_elem.loads.borrow().iter().any(|(&load_index, &load_timestamp)| {
load_timestamp <= clocks.clock[load_index]
}) {
log::info!("Stopped due to coherent read-read");
keep_searching = false;
return true;
}
// The current load, which may be sequenced-after an SC fence, can only read-from
// the last store sequenced-before an SC fence in another thread (or any stores
// later than that SC fence)
if store_elem.timestamp <= clocks.fence_seqcst[store_elem.store_index] {
log::info!("Stopped due to coherent load sequenced after sc fence");
keep_searching = false;
return true;
}
// The current non-SC load can only read-from the latest SC store (or any stores later than that
// SC store)
if store_elem.timestamp <= clocks.write_seqcst[store_elem.store_index]
&& store_elem.is_seqcst
{
log::info!("Stopped due to needing to load from the last SC store");
keep_searching = false;
return true;
}
// The current SC load can only read-from the last store sequenced-before
// the last SC fence (or any stores later than the SC fence)
if is_seqcst && store_elem.timestamp <= clocks.read_seqcst[store_elem.store_index] {
log::info!("Stopped due to sc load needing to load from the last SC store before an SC fence");
keep_searching = false;
return true;
}
true
})
.filter(|&store_elem| {
// Skip over all but the last SC store
let include = !(store_elem.is_seqcst && found_sc);
found_sc |= store_elem.is_seqcst;
include
});
candidates.choose(rng)
}
/// ATOMIC STORE IMPL in the paper
/// The paper also wants the variable's clock (AtomicMemoryCellClocks::sync_vector in our code)
/// to be in the store element, but it's not used anywhere so we don't actually need it
fn store_impl(
&mut self,
val: ScalarMaybeUninit<Tag>,
index: VectorIdx,
thread_clock: &VClock,
is_seqcst: bool,
) {
let store_elem = StoreElement {
store_index: index,
timestamp: thread_clock[index],
// In the language provided in the paper, an atomic store takes the value from a
// non-atomic memory location.
// But we already have the immediate value here so we don't need to do the memory
// access
val,
is_seqcst,
loads: RefCell::new(FxHashMap::default()),
};
self.buffer.push_back(store_elem);
if self.buffer.len() > STORE_BUFFER_LIMIT {
self.buffer.pop_front();
}
if is_seqcst {
// Every store that happens before this needs to be marked as SC
// so that in fetch_store, only the last SC store (i.e. this one) or stores that
// aren't ordered by hb with the last SC is picked.
//
// This is described in the paper (§4.5) and implemented in tsan11
// (https://github.com/ChrisLidbury/tsan11/blob/ecbd6b81e9b9454e01cba78eb9d88684168132c7/lib/tsan/rtl/tsan_relaxed.cc#L160-L167)
// but absent in the operational semantics.
self.buffer.iter_mut().rev().for_each(|elem| {
if elem.timestamp <= thread_clock[elem.store_index] {
elem.is_seqcst = true;
}
})
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StoreElement {
/// The identifier of the vector index, corresponding to a thread
/// that performed the store.
store_index: VectorIdx,
/// Whether this store is SC.
is_seqcst: bool,
/// The timestamp of the storing thread when it performed the store
timestamp: VTimestamp,
/// The value of this store
val: ScalarMaybeUninit<Tag>,
/// Latest timestamp of each thread that has loaded from this store element
/// Behind a RefCell to keep load op take &self
loads: RefCell<FxHashMap<VectorIdx, VTimestamp>>,
}
impl StoreElement {
/// ATOMIC LOAD IMPL in the paper
/// Unlike the operational semantics in the paper, we don't need to keep track
/// of the thread timestamp for every single load. Keeping track of the latest
/// timestamp of each thread that has loaded from a store is sufficient to
/// determine if any previous load happens before the current one while calling
/// fetch_store
fn load_impl(&self, index: VectorIdx, clocks: &ThreadClockSet) -> ScalarMaybeUninit<Tag> {
self.loads.borrow_mut().insert(index, clocks.clock[index]);
self.val
}
}