Skip to content

Commit 2c78492

Browse files
committed
make iterators Send + 'static to support cross-thread usage
1 parent 402b0ed commit 2c78492

File tree

10 files changed

+406
-33
lines changed

10 files changed

+406
-33
lines changed

src/abstract.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ pub trait AbstractTree {
4949
&self,
5050
seqno: SeqNo,
5151
index: Option<Arc<Memtable>>,
52-
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
52+
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
5353
self.range::<&[u8], _>(.., seqno, index)
5454
}
5555

@@ -61,7 +61,7 @@ pub trait AbstractTree {
6161
prefix: K,
6262
seqno: SeqNo,
6363
index: Option<Arc<Memtable>>,
64-
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_>;
64+
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static>;
6565

6666
/// Returns an iterator over a range of items.
6767
///
@@ -71,7 +71,7 @@ pub trait AbstractTree {
7171
range: R,
7272
seqno: SeqNo,
7373
index: Option<Arc<Memtable>>,
74-
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_>;
74+
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static>;
7575

7676
/// Ingests a sorted stream of key-value pairs into the tree.
7777
///

src/blob_tree/mod.rs

Lines changed: 82 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,26 @@ use crate::{
1919
value::InternalValue,
2020
version::Version,
2121
vlog::{Accessor, BlobFile, BlobFileWriter, ValueHandle},
22-
Config, Memtable, SegmentId, SeqNo, SequenceNumberCounter, UserKey, UserValue,
22+
Cache, Config, DescriptorTable, Memtable, SegmentId, SeqNo, SequenceNumberCounter, TreeId,
23+
UserKey, UserValue,
2324
};
2425
use handle::BlobIndirection;
2526
use std::{io::Cursor, ops::RangeBounds, path::PathBuf, sync::Arc};
2627

27-
pub struct Guard<'a> {
28-
blob_tree: &'a BlobTree,
28+
struct IterShared {
29+
tree_id: TreeId,
30+
blobs_folder: Arc<PathBuf>,
31+
cache: Arc<Cache>,
32+
descriptor_table: Arc<DescriptorTable>,
2933
version: Version,
34+
}
35+
36+
pub struct Guard {
37+
shared: Arc<IterShared>,
3038
kv: crate::Result<InternalValue>,
3139
}
3240

33-
impl IterGuard for Guard<'_> {
41+
impl IterGuard for Guard {
3442
fn key(self) -> crate::Result<UserKey> {
3543
self.kv.map(|kv| kv.key.user_key)
3644
}
@@ -49,7 +57,14 @@ impl IterGuard for Guard<'_> {
4957
}
5058

5159
fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
52-
resolve_value_handle(self.blob_tree, &self.version, self.kv?)
60+
resolve_value_handle_owned(
61+
self.shared.tree_id,
62+
self.shared.blobs_folder.as_path(),
63+
&self.shared.cache,
64+
&self.shared.descriptor_table,
65+
&self.shared.version,
66+
self.kv?,
67+
)
5368
}
5469
}
5570

@@ -61,7 +76,7 @@ fn resolve_value_handle(tree: &BlobTree, version: &Version, item: InternalValue)
6176
// Resolve indirection using value log
6277
match Accessor::new(&version.blob_files).get(
6378
tree.id(),
64-
&tree.blobs_folder,
79+
tree.blobs_folder.as_path(),
6580
&item.key.user_key,
6681
&vptr.vhandle,
6782
&tree.index.config.cache,
@@ -87,6 +102,47 @@ fn resolve_value_handle(tree: &BlobTree, version: &Version, item: InternalValue)
87102
}
88103
}
89104

105+
fn resolve_value_handle_owned(
106+
tree_id: TreeId,
107+
blobs_folder: &std::path::Path,
108+
cache: &Arc<Cache>,
109+
descriptor_table: &Arc<DescriptorTable>,
110+
version: &Version,
111+
item: InternalValue,
112+
) -> RangeItem {
113+
if item.key.value_type.is_indirection() {
114+
let mut cursor = Cursor::new(item.value);
115+
let vptr = BlobIndirection::decode_from(&mut cursor)?;
116+
117+
// Resolve indirection using value log
118+
match Accessor::new(&version.blob_files).get(
119+
tree_id,
120+
blobs_folder,
121+
&item.key.user_key,
122+
&vptr.vhandle,
123+
cache,
124+
descriptor_table,
125+
) {
126+
Ok(Some(v)) => {
127+
let k = item.key.user_key;
128+
Ok((k, v))
129+
}
130+
Ok(None) => {
131+
panic!(
132+
"value handle ({:?} => {:?}) did not match any blob - this is a bug; version={}",
133+
item.key.user_key, vptr.vhandle,
134+
version.id(),
135+
);
136+
}
137+
Err(e) => Err(e),
138+
}
139+
} else {
140+
let k = item.key.user_key;
141+
let v = item.value;
142+
Ok((k, v))
143+
}
144+
}
145+
90146
/// A key-value-separated log-structured merge tree
91147
///
92148
/// This tree is a composite structure, consisting of an
@@ -98,7 +154,7 @@ pub struct BlobTree {
98154
#[doc(hidden)]
99155
pub index: crate::Tree,
100156

101-
blobs_folder: PathBuf,
157+
blobs_folder: Arc<PathBuf>,
102158
}
103159

104160
impl BlobTree {
@@ -124,7 +180,7 @@ impl BlobTree {
124180

125181
Ok(Self {
126182
index,
127-
blobs_folder,
183+
blobs_folder: Arc::new(blobs_folder),
128184
})
129185
}
130186
}
@@ -180,20 +236,25 @@ impl AbstractTree for BlobTree {
180236
prefix: K,
181237
seqno: SeqNo,
182238
index: Option<Arc<Memtable>>,
183-
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
239+
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
184240
use crate::range::prefix_to_range;
185241

186242
let range = prefix_to_range(prefix.as_ref());
187243

188-
let version = self.current_version();
244+
let shared = Arc::new(IterShared {
245+
tree_id: self.id(),
246+
blobs_folder: Arc::clone(&self.blobs_folder),
247+
cache: self.index.config.cache.clone(),
248+
descriptor_table: self.index.config.descriptor_table.clone(),
249+
version: self.current_version(),
250+
});
189251

190252
Box::new(
191253
self.index
192254
.create_internal_range(&range, seqno, index)
193255
.map(move |kv| {
194256
IterGuardImpl::Blob(Guard {
195-
blob_tree: self,
196-
version: version.clone(), // TODO: PERF: ugly Arc clone
257+
shared: Arc::clone(&shared),
197258
kv,
198259
})
199260
}),
@@ -205,17 +266,21 @@ impl AbstractTree for BlobTree {
205266
range: R,
206267
seqno: SeqNo,
207268
index: Option<Arc<Memtable>>,
208-
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
209-
let version = self.current_version();
269+
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
270+
let shared = Arc::new(IterShared {
271+
tree_id: self.id(),
272+
blobs_folder: Arc::clone(&self.blobs_folder),
273+
cache: self.index.config.cache.clone(),
274+
descriptor_table: self.index.config.descriptor_table.clone(),
275+
version: self.current_version(),
276+
});
210277

211-
// TODO: PERF: ugly Arc clone
212278
Box::new(
213279
self.index
214280
.create_internal_range(&range, seqno, index)
215281
.map(move |kv| {
216282
IterGuardImpl::Blob(Guard {
217-
blob_tree: self,
218-
version: version.clone(), // TODO: PERF: ugly Arc clone
283+
shared: Arc::clone(&shared),
219284
kv,
220285
})
221286
}),

src/iter_guard.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ pub trait IterGuard {
4141
}
4242

4343
#[enum_dispatch(IterGuard)]
44-
pub enum IterGuardImpl<'a> {
44+
pub enum IterGuardImpl {
4545
Standard(StandardGuard),
46-
Blob(BlobGuard<'a>),
46+
Blob(BlobGuard),
4747
}

src/merge.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use interval_heap::IntervalHeap as Heap;
77

88
type IterItem = crate::Result<InternalValue>;
99

10-
pub type BoxedIterator<'a> = Box<dyn DoubleEndedIterator<Item = IterItem> + 'a>;
10+
pub type BoxedIterator<'a> = Box<dyn DoubleEndedIterator<Item = IterItem> + Send + 'a>;
1111

1212
#[derive(Eq)]
1313
struct HeapItem(usize, InternalValue);

src/range.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ pub struct IterState {
7373
pub(crate) version: Version,
7474
}
7575

76-
type BoxedMerge<'a> = Box<dyn DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'a>;
76+
type BoxedMerge<'a> = Box<dyn DoubleEndedIterator<Item = crate::Result<InternalValue>> + Send + 'a>;
7777

7878
// TODO: maybe we can lifetime TreeIter and then use InternalKeyRef everywhere to bound lifetime of iterators (no need to construct InternalKey then, can just use range)
7979
self_cell!(

src/run_reader.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@ pub struct RunReader {
1313
run: Arc<Run<Segment>>,
1414
lo: usize,
1515
hi: usize,
16-
lo_reader: Option<Box<dyn DoubleEndedIterator<Item = crate::Result<InternalValue>>>>,
17-
hi_reader: Option<Box<dyn DoubleEndedIterator<Item = crate::Result<InternalValue>>>>,
16+
lo_reader: Option<BoxedIterator<'static>>,
17+
hi_reader: Option<BoxedIterator<'static>>,
1818
cache_policy: CachePolicy,
1919
}
2020

2121
impl RunReader {
2222
#[must_use]
23-
pub fn new<R: RangeBounds<UserKey> + Clone + 'static>(
23+
pub fn new<R: RangeBounds<UserKey> + Clone + Send + 'static>(
2424
run: Arc<Run<Segment>>,
2525
range: R,
2626
cache_policy: CachePolicy,
@@ -33,7 +33,7 @@ impl RunReader {
3333
}
3434

3535
#[must_use]
36-
pub fn culled<R: RangeBounds<UserKey> + Clone + 'static>(
36+
pub fn culled<R: RangeBounds<UserKey> + Clone + Send + 'static>(
3737
run: Arc<Run<Segment>>,
3838
range: R,
3939
(lo, hi): (Option<usize>, Option<usize>),

src/segment/block_index/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ pub trait BlockIndex {
2323
fn iter(&self) -> Box<dyn BlockIndexIter>;
2424
}
2525

26-
pub trait BlockIndexIter: DoubleEndedIterator<Item = crate::Result<KeyedBlockHandle>> {
26+
pub trait BlockIndexIter:
27+
DoubleEndedIterator<Item = crate::Result<KeyedBlockHandle>> + Send
28+
{
2729
fn seek_lower(&mut self, key: &[u8]) -> bool;
2830
fn seek_upper(&mut self, key: &[u8]) -> bool;
2931
}

src/segment/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,10 +335,10 @@ impl Segment {
335335
#[must_use]
336336
#[allow(clippy::iter_without_into_iter)]
337337
#[doc(hidden)]
338-
pub fn range<R: RangeBounds<UserKey>>(
338+
pub fn range<R: RangeBounds<UserKey> + Send>(
339339
&self,
340340
range: R,
341-
) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> {
341+
) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + Send {
342342
use crate::fallible_clipping_iter::FallibleClippingIter;
343343

344344
let index_iter = self.block_index.iter();

src/tree/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ impl AbstractTree for Tree {
142142
prefix: K,
143143
seqno: SeqNo,
144144
index: Option<Arc<Memtable>>,
145-
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
145+
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
146146
Box::new(
147147
self.create_prefix(&prefix, seqno, index)
148148
.map(|kv| IterGuardImpl::Standard(Guard(kv))),
@@ -154,7 +154,7 @@ impl AbstractTree for Tree {
154154
range: R,
155155
seqno: SeqNo,
156156
index: Option<Arc<Memtable>>,
157-
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
157+
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
158158
Box::new(
159159
self.create_range(&range, seqno, index)
160160
.map(|kv| IterGuardImpl::Standard(Guard(kv))),

0 commit comments

Comments
 (0)