Skip to content

Commit 1efbd21

Browse files
committed
make iterators Send + 'static to support cross-thread usage
1 parent 402b0ed commit 1efbd21

File tree

10 files changed

+401
-28
lines changed

10 files changed

+401
-28
lines changed

src/abstract.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ pub trait AbstractTree {
4949
&self,
5050
seqno: SeqNo,
5151
index: Option<Arc<Memtable>>,
52-
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
52+
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
5353
self.range::<&[u8], _>(.., seqno, index)
5454
}
5555

@@ -61,7 +61,7 @@ pub trait AbstractTree {
6161
prefix: K,
6262
seqno: SeqNo,
6363
index: Option<Arc<Memtable>>,
64-
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_>;
64+
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static>;
6565

6666
/// Returns an iterator over a range of items.
6767
///
@@ -71,7 +71,7 @@ pub trait AbstractTree {
7171
range: R,
7272
seqno: SeqNo,
7373
index: Option<Arc<Memtable>>,
74-
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_>;
74+
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static>;
7575

7676
/// Ingests a sorted stream of key-value pairs into the tree.
7777
///

src/blob_tree/mod.rs

Lines changed: 77 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,22 @@ use crate::{
1919
value::InternalValue,
2020
version::Version,
2121
vlog::{Accessor, BlobFile, BlobFileWriter, ValueHandle},
22-
Config, Memtable, SegmentId, SeqNo, SequenceNumberCounter, UserKey, UserValue,
22+
Cache, Config, DescriptorTable, Memtable, SegmentId, SeqNo, SequenceNumberCounter, TreeId,
23+
UserKey, UserValue,
2324
};
2425
use handle::BlobIndirection;
2526
use std::{io::Cursor, ops::RangeBounds, path::PathBuf, sync::Arc};
2627

27-
pub struct Guard<'a> {
28-
blob_tree: &'a BlobTree,
28+
pub struct Guard {
29+
tree_id: TreeId,
30+
blobs_folder: Arc<PathBuf>,
31+
cache: Arc<Cache>,
32+
descriptor_table: Arc<DescriptorTable>,
2933
version: Version,
3034
kv: crate::Result<InternalValue>,
3135
}
3236

33-
impl IterGuard for Guard<'_> {
37+
impl IterGuard for Guard {
3438
fn key(self) -> crate::Result<UserKey> {
3539
self.kv.map(|kv| kv.key.user_key)
3640
}
@@ -49,7 +53,14 @@ impl IterGuard for Guard<'_> {
4953
}
5054

5155
fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
52-
resolve_value_handle(self.blob_tree, &self.version, self.kv?)
56+
resolve_value_handle_owned(
57+
self.tree_id,
58+
self.blobs_folder.as_path(),
59+
&self.cache,
60+
&self.descriptor_table,
61+
&self.version,
62+
self.kv?,
63+
)
5364
}
5465
}
5566

@@ -87,6 +98,47 @@ fn resolve_value_handle(tree: &BlobTree, version: &Version, item: InternalValue)
8798
}
8899
}
89100

101+
fn resolve_value_handle_owned(
102+
tree_id: TreeId,
103+
blobs_folder: &std::path::Path,
104+
cache: &Arc<Cache>,
105+
descriptor_table: &Arc<DescriptorTable>,
106+
version: &Version,
107+
item: InternalValue,
108+
) -> RangeItem {
109+
if item.key.value_type.is_indirection() {
110+
let mut cursor = Cursor::new(item.value);
111+
let vptr = BlobIndirection::decode_from(&mut cursor)?;
112+
113+
// Resolve indirection using value log
114+
match Accessor::new(&version.blob_files).get(
115+
tree_id,
116+
blobs_folder,
117+
&item.key.user_key,
118+
&vptr.vhandle,
119+
cache,
120+
descriptor_table,
121+
) {
122+
Ok(Some(v)) => {
123+
let k = item.key.user_key;
124+
Ok((k, v))
125+
}
126+
Ok(None) => {
127+
panic!(
128+
"value handle ({:?} => {:?}) did not match any blob - this is a bug; version={}",
129+
item.key.user_key, vptr.vhandle,
130+
version.id(),
131+
);
132+
}
133+
Err(e) => Err(e),
134+
}
135+
} else {
136+
let k = item.key.user_key;
137+
let v = item.value;
138+
Ok((k, v))
139+
}
140+
}
141+
90142
/// A key-value-separated log-structured merge tree
91143
///
92144
/// This tree is a composite structure, consisting of an
@@ -180,20 +232,27 @@ impl AbstractTree for BlobTree {
180232
prefix: K,
181233
seqno: SeqNo,
182234
index: Option<Arc<Memtable>>,
183-
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
235+
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
184236
use crate::range::prefix_to_range;
185237

186238
let range = prefix_to_range(prefix.as_ref());
187239

188240
let version = self.current_version();
241+
let tree_id = self.id();
242+
let blobs_folder = Arc::new(self.blobs_folder.clone());
243+
let cache = self.index.config.cache.clone();
244+
let descriptor_table = self.index.config.descriptor_table.clone();
189245

190246
Box::new(
191247
self.index
192248
.create_internal_range(&range, seqno, index)
193249
.map(move |kv| {
194250
IterGuardImpl::Blob(Guard {
195-
blob_tree: self,
196-
version: version.clone(), // TODO: PERF: ugly Arc clone
251+
tree_id,
252+
blobs_folder: Arc::clone(&blobs_folder),
253+
cache: cache.clone(),
254+
descriptor_table: descriptor_table.clone(),
255+
version: version.clone(),
197256
kv,
198257
})
199258
}),
@@ -205,17 +264,23 @@ impl AbstractTree for BlobTree {
205264
range: R,
206265
seqno: SeqNo,
207266
index: Option<Arc<Memtable>>,
208-
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
267+
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
209268
let version = self.current_version();
269+
let tree_id = self.id();
270+
let blobs_folder = Arc::new(self.blobs_folder.clone());
271+
let cache = self.index.config.cache.clone();
272+
let descriptor_table = self.index.config.descriptor_table.clone();
210273

211-
// TODO: PERF: ugly Arc clone
212274
Box::new(
213275
self.index
214276
.create_internal_range(&range, seqno, index)
215277
.map(move |kv| {
216278
IterGuardImpl::Blob(Guard {
217-
blob_tree: self,
218-
version: version.clone(), // TODO: PERF: ugly Arc clone
279+
tree_id,
280+
blobs_folder: Arc::clone(&blobs_folder),
281+
cache: cache.clone(),
282+
descriptor_table: descriptor_table.clone(),
283+
version: version.clone(),
219284
kv,
220285
})
221286
}),

src/iter_guard.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ pub trait IterGuard {
4141
}
4242

4343
#[enum_dispatch(IterGuard)]
44-
pub enum IterGuardImpl<'a> {
44+
pub enum IterGuardImpl {
4545
Standard(StandardGuard),
46-
Blob(BlobGuard<'a>),
46+
Blob(BlobGuard),
4747
}

src/merge.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use interval_heap::IntervalHeap as Heap;
77

88
type IterItem = crate::Result<InternalValue>;
99

10-
pub type BoxedIterator<'a> = Box<dyn DoubleEndedIterator<Item = IterItem> + 'a>;
10+
pub type BoxedIterator<'a> = Box<dyn DoubleEndedIterator<Item = IterItem> + Send + 'a>;
1111

1212
#[derive(Eq)]
1313
struct HeapItem(usize, InternalValue);

src/range.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ pub struct IterState {
7373
pub(crate) version: Version,
7474
}
7575

76-
type BoxedMerge<'a> = Box<dyn DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'a>;
76+
type BoxedMerge<'a> = Box<dyn DoubleEndedIterator<Item = crate::Result<InternalValue>> + Send + 'a>;
7777

7878
// TODO: maybe we can lifetime TreeIter and then use InternalKeyRef everywhere to bound lifetime of iterators (no need to construct InternalKey then, can just use range)
7979
self_cell!(

src/run_reader.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@ pub struct RunReader {
1313
run: Arc<Run<Segment>>,
1414
lo: usize,
1515
hi: usize,
16-
lo_reader: Option<Box<dyn DoubleEndedIterator<Item = crate::Result<InternalValue>>>>,
17-
hi_reader: Option<Box<dyn DoubleEndedIterator<Item = crate::Result<InternalValue>>>>,
16+
lo_reader: Option<BoxedIterator<'static>>,
17+
hi_reader: Option<BoxedIterator<'static>>,
1818
cache_policy: CachePolicy,
1919
}
2020

2121
impl RunReader {
2222
#[must_use]
23-
pub fn new<R: RangeBounds<UserKey> + Clone + 'static>(
23+
pub fn new<R: RangeBounds<UserKey> + Clone + Send + 'static>(
2424
run: Arc<Run<Segment>>,
2525
range: R,
2626
cache_policy: CachePolicy,
@@ -33,7 +33,7 @@ impl RunReader {
3333
}
3434

3535
#[must_use]
36-
pub fn culled<R: RangeBounds<UserKey> + Clone + 'static>(
36+
pub fn culled<R: RangeBounds<UserKey> + Clone + Send + 'static>(
3737
run: Arc<Run<Segment>>,
3838
range: R,
3939
(lo, hi): (Option<usize>, Option<usize>),

src/segment/block_index/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ pub trait BlockIndex {
2323
fn iter(&self) -> Box<dyn BlockIndexIter>;
2424
}
2525

26-
pub trait BlockIndexIter: DoubleEndedIterator<Item = crate::Result<KeyedBlockHandle>> {
26+
pub trait BlockIndexIter:
27+
DoubleEndedIterator<Item = crate::Result<KeyedBlockHandle>> + Send
28+
{
2729
fn seek_lower(&mut self, key: &[u8]) -> bool;
2830
fn seek_upper(&mut self, key: &[u8]) -> bool;
2931
}

src/segment/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,10 +335,10 @@ impl Segment {
335335
#[must_use]
336336
#[allow(clippy::iter_without_into_iter)]
337337
#[doc(hidden)]
338-
pub fn range<R: RangeBounds<UserKey>>(
338+
pub fn range<R: RangeBounds<UserKey> + Send>(
339339
&self,
340340
range: R,
341-
) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> {
341+
) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + Send {
342342
use crate::fallible_clipping_iter::FallibleClippingIter;
343343

344344
let index_iter = self.block_index.iter();

src/tree/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ impl AbstractTree for Tree {
142142
prefix: K,
143143
seqno: SeqNo,
144144
index: Option<Arc<Memtable>>,
145-
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
145+
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
146146
Box::new(
147147
self.create_prefix(&prefix, seqno, index)
148148
.map(|kv| IterGuardImpl::Standard(Guard(kv))),
@@ -154,7 +154,7 @@ impl AbstractTree for Tree {
154154
range: R,
155155
seqno: SeqNo,
156156
index: Option<Arc<Memtable>>,
157-
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
157+
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
158158
Box::new(
159159
self.create_range(&range, seqno, index)
160160
.map(|kv| IterGuardImpl::Standard(Guard(kv))),

0 commit comments

Comments
 (0)