Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/abstract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub trait AbstractTree {
&self,
seqno: SeqNo,
index: Option<Arc<Memtable>>,
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
self.range::<&[u8], _>(.., seqno, index)
}

Expand All @@ -61,7 +61,7 @@ pub trait AbstractTree {
prefix: K,
seqno: SeqNo,
index: Option<Arc<Memtable>>,
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_>;
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static>;

/// Returns an iterator over a range of items.
///
Expand All @@ -71,7 +71,7 @@ pub trait AbstractTree {
range: R,
seqno: SeqNo,
index: Option<Arc<Memtable>>,
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_>;
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static>;

/// Ingests a sorted stream of key-value pairs into the tree.
///
Expand Down
82 changes: 60 additions & 22 deletions src/blob_tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,26 @@ use crate::{
value::InternalValue,
version::Version,
vlog::{Accessor, BlobFile, BlobFileWriter, ValueHandle},
Config, Memtable, SegmentId, SeqNo, SequenceNumberCounter, UserKey, UserValue,
Cache, Config, DescriptorTable, Memtable, SegmentId, SeqNo, SequenceNumberCounter, TreeId,
UserKey, UserValue,
};
use handle::BlobIndirection;
use std::{io::Cursor, ops::RangeBounds, path::PathBuf, sync::Arc};

pub struct Guard<'a> {
blob_tree: &'a BlobTree,
struct IterShared {
tree_id: TreeId,
blobs_folder: Arc<PathBuf>,
cache: Arc<Cache>,
descriptor_table: Arc<DescriptorTable>,
version: Version,
}

pub struct Guard {
shared: Arc<IterShared>,
kv: crate::Result<InternalValue>,
}

impl IterGuard for Guard<'_> {
impl IterGuard for Guard {
fn key(self) -> crate::Result<UserKey> {
self.kv.map(|kv| kv.key.user_key)
}
Expand All @@ -49,23 +57,37 @@ impl IterGuard for Guard<'_> {
}

fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
resolve_value_handle(self.blob_tree, &self.version, self.kv?)
resolve_value_handle(
self.shared.tree_id,
self.shared.blobs_folder.as_path(),
&self.shared.cache,
&self.shared.descriptor_table,
&self.shared.version,
self.kv?,
)
}
}

fn resolve_value_handle(tree: &BlobTree, version: &Version, item: InternalValue) -> RangeItem {
fn resolve_value_handle(
tree_id: TreeId,
blobs_folder: &std::path::Path,
cache: &Arc<Cache>,
descriptor_table: &Arc<DescriptorTable>,
version: &Version,
item: InternalValue,
) -> RangeItem {
if item.key.value_type.is_indirection() {
let mut cursor = Cursor::new(item.value);
let vptr = BlobIndirection::decode_from(&mut cursor)?;

// Resolve indirection using value log
match Accessor::new(&version.blob_files).get(
tree.id(),
&tree.blobs_folder,
tree_id,
blobs_folder,
&item.key.user_key,
&vptr.vhandle,
&tree.index.config.cache,
&tree.index.config.descriptor_table,
cache,
descriptor_table,
) {
Ok(Some(v)) => {
let k = item.key.user_key;
Expand Down Expand Up @@ -98,7 +120,7 @@ pub struct BlobTree {
#[doc(hidden)]
pub index: crate::Tree,

blobs_folder: PathBuf,
blobs_folder: Arc<PathBuf>,
}

impl BlobTree {
Expand All @@ -124,7 +146,7 @@ impl BlobTree {

Ok(Self {
index,
blobs_folder,
blobs_folder: Arc::new(blobs_folder),
})
}
}
Expand Down Expand Up @@ -180,20 +202,25 @@ impl AbstractTree for BlobTree {
prefix: K,
seqno: SeqNo,
index: Option<Arc<Memtable>>,
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
use crate::range::prefix_to_range;

let range = prefix_to_range(prefix.as_ref());

let version = self.current_version();
let shared = Arc::new(IterShared {
tree_id: self.id(),
blobs_folder: Arc::clone(&self.blobs_folder),
cache: self.index.config.cache.clone(),
descriptor_table: self.index.config.descriptor_table.clone(),
version: self.current_version(),
});

Box::new(
self.index
.create_internal_range(&range, seqno, index)
.map(move |kv| {
IterGuardImpl::Blob(Guard {
blob_tree: self,
version: version.clone(), // TODO: PERF: ugly Arc clone
shared: Arc::clone(&shared),
kv,
})
}),
Expand All @@ -205,17 +232,21 @@ impl AbstractTree for BlobTree {
range: R,
seqno: SeqNo,
index: Option<Arc<Memtable>>,
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
let version = self.current_version();
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
let shared = Arc::new(IterShared {
tree_id: self.id(),
blobs_folder: Arc::clone(&self.blobs_folder),
cache: self.index.config.cache.clone(),
descriptor_table: self.index.config.descriptor_table.clone(),
version: self.current_version(),
});

// TODO: PERF: ugly Arc clone
Box::new(
self.index
.create_internal_range(&range, seqno, index)
.map(move |kv| {
IterGuardImpl::Blob(Guard {
blob_tree: self,
version: version.clone(), // TODO: PERF: ugly Arc clone
shared: Arc::clone(&shared),
kv,
})
}),
Expand Down Expand Up @@ -609,7 +640,14 @@ impl AbstractTree for BlobTree {
};

let version = self.current_version();
let (_, v) = resolve_value_handle(self, &version, item)?;
let (_, v) = resolve_value_handle(
self.id(),
self.blobs_folder.as_path(),
&self.index.config.cache,
&self.index.config.descriptor_table,
&version,
item,
)?;

Ok(Some(v))
}
Expand Down
4 changes: 2 additions & 2 deletions src/iter_guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub trait IterGuard {
}

#[enum_dispatch(IterGuard)]
pub enum IterGuardImpl<'a> {
pub enum IterGuardImpl {
Standard(StandardGuard),
Blob(BlobGuard<'a>),
Blob(BlobGuard),
}
2 changes: 1 addition & 1 deletion src/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use interval_heap::IntervalHeap as Heap;

type IterItem = crate::Result<InternalValue>;

pub type BoxedIterator<'a> = Box<dyn DoubleEndedIterator<Item = IterItem> + 'a>;
pub type BoxedIterator<'a> = Box<dyn DoubleEndedIterator<Item = IterItem> + Send + 'a>;

#[derive(Eq)]
struct HeapItem(usize, InternalValue);
Expand Down
2 changes: 1 addition & 1 deletion src/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub struct IterState {
pub(crate) version: Version,
}

type BoxedMerge<'a> = Box<dyn DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'a>;
type BoxedMerge<'a> = Box<dyn DoubleEndedIterator<Item = crate::Result<InternalValue>> + Send + 'a>;

// TODO: maybe we can lifetime TreeIter and then use InternalKeyRef everywhere to bound lifetime of iterators (no need to construct InternalKey then, can just use range)
self_cell!(
Expand Down
8 changes: 4 additions & 4 deletions src/run_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ pub struct RunReader {
run: Arc<Run<Segment>>,
lo: usize,
hi: usize,
lo_reader: Option<Box<dyn DoubleEndedIterator<Item = crate::Result<InternalValue>>>>,
hi_reader: Option<Box<dyn DoubleEndedIterator<Item = crate::Result<InternalValue>>>>,
lo_reader: Option<BoxedIterator<'static>>,
hi_reader: Option<BoxedIterator<'static>>,
}

impl RunReader {
#[must_use]
pub fn new<R: RangeBounds<UserKey> + Clone + 'static>(
pub fn new<R: RangeBounds<UserKey> + Clone + Send + 'static>(
run: Arc<Run<Segment>>,
range: R,
) -> Option<Self> {
Expand All @@ -31,7 +31,7 @@ impl RunReader {
}

#[must_use]
pub fn culled<R: RangeBounds<UserKey> + Clone + 'static>(
pub fn culled<R: RangeBounds<UserKey> + Clone + Send + 'static>(
run: Arc<Run<Segment>>,
range: R,
(lo, hi): (Option<usize>, Option<usize>),
Expand Down
4 changes: 3 additions & 1 deletion src/segment/block_index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ pub trait BlockIndex {
fn iter(&self) -> Box<dyn BlockIndexIter>;
}

pub trait BlockIndexIter: DoubleEndedIterator<Item = crate::Result<KeyedBlockHandle>> {
pub trait BlockIndexIter:
DoubleEndedIterator<Item = crate::Result<KeyedBlockHandle>> + Send
{
fn seek_lower(&mut self, key: &[u8]) -> bool;
fn seek_upper(&mut self, key: &[u8]) -> bool;
}
Expand Down
4 changes: 2 additions & 2 deletions src/segment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,10 @@ impl Segment {
#[must_use]
#[allow(clippy::iter_without_into_iter)]
#[doc(hidden)]
pub fn range<R: RangeBounds<UserKey>>(
pub fn range<R: RangeBounds<UserKey> + Send>(
&self,
range: R,
) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> {
) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + Send {
use crate::fallible_clipping_iter::FallibleClippingIter;

let index_iter = self.block_index.iter();
Expand Down
4 changes: 2 additions & 2 deletions src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl AbstractTree for Tree {
prefix: K,
seqno: SeqNo,
index: Option<Arc<Memtable>>,
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
Box::new(
self.create_prefix(&prefix, seqno, index)
.map(|kv| IterGuardImpl::Standard(Guard(kv))),
Expand All @@ -152,7 +152,7 @@ impl AbstractTree for Tree {
range: R,
seqno: SeqNo,
index: Option<Arc<Memtable>>,
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
Box::new(
self.create_range(&range, seqno, index)
.map(|kv| IterGuardImpl::Standard(Guard(kv))),
Expand Down
Loading