diff --git a/core/Cargo.toml b/core/Cargo.toml index 05c2f831532b..2f7d360d11d9 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -192,7 +192,7 @@ services-oss = [ services-pcloud = [] services-persy = ["dep:persy", "internal-tokio-rt"] services-postgresql = ["dep:sqlx", "sqlx?/postgres"] -services-redb = ["dep:redb", "internal-tokio-rt"] +services-redb = ["dep:redb", "internal-tokio-rt", "dep:flume"] services-redis = ["dep:redis", "dep:bb8", "redis?/tokio-rustls-comp"] services-redis-native-tls = ["services-redis", "redis?/tokio-native-tls-comp"] services-rocksdb = ["dep:rocksdb", "internal-tokio-rt"] diff --git a/core/src/services/redb/backend.rs b/core/src/services/redb/backend.rs index d51dae1af2c4..b507887b770d 100644 --- a/core/src/services/redb/backend.rs +++ b/core/src/services/redb/backend.rs @@ -16,12 +16,11 @@ // under the License. use std::fmt::Debug; -use std::fmt::Formatter; use std::sync::Arc; use tokio::task; -use crate::raw::adapters::kv; +use crate::raw::oio::HierarchyLister; use crate::raw::*; use crate::services::RedbConfig; use crate::Builder; @@ -30,6 +29,13 @@ use crate::ErrorKind; use crate::Scheme; use crate::*; +use super::core::RedbCore; +use super::deleter::RedbDeleter; +use super::error::*; +use super::lister::RedbFilter; +use super::lister::RedbLister; +use super::writer::RedbWriter; + impl Configurator for RedbConfig { type Builder = RedbBuilder; fn into_builder(self) -> Self::Builder { @@ -123,192 +129,137 @@ impl Builder for RedbBuilder { (Some(datadir), db) }; - create_table(&db, &table_name)?; - - Ok(RedbBackend::new(Adapter { + let core = RedbCore { datadir, table: table_name, + root: self.config.root.unwrap_or_else(|| "/".into()), db, - }) - .with_root(self.config.root.as_deref().unwrap_or_default())) - } -} - -/// Backend for Redb services. -pub type RedbBackend = kv::Backend; - -#[derive(Clone)] -pub struct Adapter { - datadir: Option, - table: String, - db: Arc, -} + }; + core.create_table()?; -impl Debug for Adapter { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let mut ds = f.debug_struct("Adapter"); - ds.field("path", &self.datadir); - ds.finish() + Ok(RedbBackend { core: core.into() }) } } -impl kv::Adapter for Adapter { - type Scanner = (); +#[derive(Debug, Clone)] +pub struct RedbBackend { + core: Arc, +} - fn info(&self) -> kv::Info { - kv::Info::new( - Scheme::Redb, - &self.table, - Capability { +impl Access for RedbBackend { + type Reader = Buffer; + type Writer = RedbWriter; + type Lister = HierarchyLister; + type Deleter = oio::OneShotDeleter; + type BlockingReader = Buffer; + type BlockingWriter = RedbWriter; + type BlockingLister = HierarchyLister; + type BlockingDeleter = oio::OneShotDeleter; + + fn info(&self) -> Arc { + let am = AccessorInfo::default(); + am.set_scheme(Scheme::Redb) + .set_root(&self.core.root) + .set_name(&self.core.table) + .set_native_capability(Capability { read: true, + stat: true, + write: true, + write_can_empty: true, + delete: true, + + list: true, + blocking: true, shared: false, ..Default::default() - }, - ) - } - - async fn get(&self, path: &str) -> Result> { - let cloned_self = self.clone(); - let cloned_path = path.to_string(); - - task::spawn_blocking(move || cloned_self.blocking_get(cloned_path.as_str())) - .await - .map_err(new_task_join_error) - .and_then(|inner_result| inner_result) - } - - fn blocking_get(&self, path: &str) -> Result> { - let read_txn = self.db.begin_read().map_err(parse_transaction_error)?; - - let table_define: redb::TableDefinition<&str, &[u8]> = - redb::TableDefinition::new(&self.table); + }); - let table = read_txn - .open_table(table_define) - .map_err(parse_table_error)?; - - let result = match table.get(path) { - Ok(Some(v)) => Ok(Some(v.value().to_vec())), - Ok(None) => Ok(None), - Err(e) => Err(parse_storage_error(e)), - }?; - Ok(result.map(Buffer::from)) + am.into() } - async fn set(&self, path: &str, value: Buffer) -> Result<()> { + async fn stat(&self, path: &str, args: OpStat) -> Result { let cloned_self = self.clone(); let cloned_path = path.to_string(); - task::spawn_blocking(move || cloned_self.blocking_set(cloned_path.as_str(), value)) + task::spawn_blocking(move || cloned_self.blocking_stat(cloned_path.as_str(), args)) .await .map_err(new_task_join_error) .and_then(|inner_result| inner_result) } - fn blocking_set(&self, path: &str, value: Buffer) -> Result<()> { - let write_txn = self.db.begin_write().map_err(parse_transaction_error)?; - - let table_define: redb::TableDefinition<&str, &[u8]> = - redb::TableDefinition::new(&self.table); - - { - let mut table = write_txn - .open_table(table_define) - .map_err(parse_table_error)?; + fn blocking_stat(&self, path: &str, _: OpStat) -> Result { + let p = build_abs_path(&self.core.root, path); - table - .insert(path, &*value.to_vec()) - .map_err(parse_storage_error)?; + if p == build_abs_path(&self.core.root, "") { + Ok(RpStat::new(Metadata::new(EntryMode::DIR))) + } else { + let bs = self.core.get(&p)?; + match bs { + Some(bs) => Ok(RpStat::new( + Metadata::new(EntryMode::from_path(&p)).with_content_length(bs.len() as u64), + )), + None => Err(Error::new(ErrorKind::NotFound, "kv doesn't have this path")), + } } - - write_txn.commit().map_err(parse_commit_error)?; - Ok(()) } - async fn delete(&self, path: &str) -> Result<()> { + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { let cloned_self = self.clone(); let cloned_path = path.to_string(); - task::spawn_blocking(move || cloned_self.blocking_delete(cloned_path.as_str())) + task::spawn_blocking(move || cloned_self.blocking_read(cloned_path.as_str(), args)) .await .map_err(new_task_join_error) .and_then(|inner_result| inner_result) } - fn blocking_delete(&self, path: &str) -> Result<()> { - let write_txn = self.db.begin_write().map_err(parse_transaction_error)?; - - let table_define: redb::TableDefinition<&str, &[u8]> = - redb::TableDefinition::new(&self.table); - - { - let mut table = write_txn - .open_table(table_define) - .map_err(parse_table_error)?; - - table.remove(path).map_err(parse_storage_error)?; - } + fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { + let p = build_abs_path(&self.core.root, path); + let bs = match self.core.get(&p)? { + Some(bs) => Buffer::from(bs), + None => return Err(Error::new(ErrorKind::NotFound, "kv doesn't have this path")), + }; + Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize()))) + } - write_txn.commit().map_err(parse_commit_error)?; - Ok(()) + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + self.blocking_write(path, args) } -} -fn parse_transaction_error(e: redb::TransactionError) -> Error { - Error::new(ErrorKind::Unexpected, "error from redb").set_source(e) -} + fn blocking_write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { + let p = build_abs_path(&self.core.root, path); -fn parse_table_error(e: redb::TableError) -> Error { - match e { - redb::TableError::TableDoesNotExist(_) => { - Error::new(ErrorKind::NotFound, "error from redb").set_source(e) - } - _ => Error::new(ErrorKind::Unexpected, "error from redb").set_source(e), + Ok((RpWrite::new(), RedbWriter::new(self.core.clone(), p))) } -} -fn parse_storage_error(e: redb::StorageError) -> Error { - Error::new(ErrorKind::Unexpected, "error from redb").set_source(e) -} + async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { + self.blocking_delete() + } -fn parse_database_error(e: redb::DatabaseError) -> Error { - Error::new(ErrorKind::Unexpected, "error from redb").set_source(e) -} + fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> { + Ok(( + RpDelete::default(), + oio::OneShotDeleter::new(RedbDeleter::new(self.core.clone())), + )) + } -fn parse_commit_error(e: redb::CommitError) -> Error { - Error::new(ErrorKind::Unexpected, "error from redb").set_source(e) -} + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { + let pattern = build_abs_path(&self.core.root, path); + let range = self.core.iter()?; + let lister = RedbLister::new(RedbFilter::new(range, pattern)); + let lister = HierarchyLister::new(lister, path, args.recursive()); -/// Check if a table exists, otherwise create it. -fn create_table(db: &redb::Database, table: &str) -> Result<()> { - // Only one `WriteTransaction` is permitted at same time, - // applying new one will block until it available. - // - // So we first try checking table existence via `ReadTransaction`. - { - let read_txn = db.begin_read().map_err(parse_transaction_error)?; - - let table_define: redb::TableDefinition<&str, &[u8]> = redb::TableDefinition::new(table); - - match read_txn.open_table(table_define) { - Ok(_) => return Ok(()), - Err(redb::TableError::TableDoesNotExist(_)) => (), - Err(e) => return Err(parse_table_error(e)), - } + Ok((RpList::default(), lister)) } - { - let write_txn = db.begin_write().map_err(parse_transaction_error)?; + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { + let pattern = build_abs_path(&self.core.root, path); + let range = self.core.iter()?; + let lister = RedbFilter::new(range, pattern); + let lister = HierarchyLister::new(lister, path, args.recursive()); - let table_define: redb::TableDefinition<&str, &[u8]> = redb::TableDefinition::new(table); - - write_txn - .open_table(table_define) - .map_err(parse_table_error)?; - write_txn.commit().map_err(parse_commit_error)?; + Ok((RpList::default(), lister)) } - - Ok(()) } diff --git a/core/src/services/redb/core.rs b/core/src/services/redb/core.rs new file mode 100644 index 000000000000..f9ceaf127890 --- /dev/null +++ b/core/src/services/redb/core.rs @@ -0,0 +1,117 @@ +use std::sync::Arc; + +use crate::Result; + +use super::error::*; + +#[derive(Debug)] +pub struct RedbCore { + #[allow(dead_code)] + pub(super) datadir: Option, + pub(super) table: String, + pub(super) root: String, + pub(super) db: Arc, +} + +impl RedbCore { + /// Check if a table exists, otherwise create it. + pub fn create_table(&self) -> Result<()> { + // Only one `WriteTransaction` is permitted at same time, + // applying new one will block until it available. + // + // So we first try checking table existence via `ReadTransaction`. + { + let read_txn = self.db.begin_read().map_err(parse_transaction_error)?; + + let table_define: redb::TableDefinition<&str, &[u8]> = + redb::TableDefinition::new(&self.table); + + match read_txn.open_table(table_define) { + Ok(_) => return Ok(()), + Err(redb::TableError::TableDoesNotExist(_)) => (), + Err(e) => return Err(parse_table_error(e)), + } + } + + { + let write_txn = self.db.begin_write().map_err(parse_transaction_error)?; + + let table_define: redb::TableDefinition<&str, &[u8]> = + redb::TableDefinition::new(&self.table); + + write_txn + .open_table(table_define) + .map_err(parse_table_error)?; + write_txn.commit().map_err(parse_commit_error)?; + } + + Ok(()) + } + + pub fn get(&self, key: &str) -> Result>> { + let read_txn = self.db.begin_read().map_err(parse_transaction_error)?; + + let table_define: redb::TableDefinition<&str, &[u8]> = + redb::TableDefinition::new(&self.table); + + let table = read_txn + .open_table(table_define) + .map_err(parse_table_error)?; + + let result = match table.get(key) { + Ok(Some(v)) => Ok(Some(v.value().to_vec())), + Ok(None) => Ok(None), + Err(e) => Err(parse_storage_error(e)), + }?; + Ok(result) + } + + pub fn set(&self, key: &str, value: &[u8]) -> Result<()> { + let write_txn = self.db.begin_write().map_err(parse_transaction_error)?; + + let table_define: redb::TableDefinition<&str, &[u8]> = + redb::TableDefinition::new(&self.table); + + { + let mut table = write_txn + .open_table(table_define) + .map_err(parse_table_error)?; + + table.insert(key, value).map_err(parse_storage_error)?; + } + + write_txn.commit().map_err(parse_commit_error)?; + Ok(()) + } + + pub fn delete(&self, key: &str) -> Result<()> { + let write_txn = self.db.begin_write().map_err(parse_transaction_error)?; + + let table_define: redb::TableDefinition<&str, &[u8]> = + redb::TableDefinition::new(&self.table); + + { + let mut table = write_txn + .open_table(table_define) + .map_err(parse_table_error)?; + + table.remove(key).map_err(parse_storage_error)?; + } + + write_txn.commit().map_err(parse_commit_error)?; + Ok(()) + } + + pub fn iter(&self) -> Result> { + let read_txn = self.db.begin_read().map_err(parse_transaction_error)?; + + let table_define: redb::TableDefinition<&str, &[u8]> = + redb::TableDefinition::new(&self.table); + + let table = read_txn + .open_table(table_define) + .map_err(parse_table_error)?; + + table.range::<&str>(..).map_err(parse_storage_error) + } +} diff --git a/core/src/services/redb/deleter.rs b/core/src/services/redb/deleter.rs new file mode 100644 index 000000000000..5bd42bd107ff --- /dev/null +++ b/core/src/services/redb/deleter.rs @@ -0,0 +1,42 @@ +use std::sync::Arc; + +use tokio::task; + +use crate::raw::build_abs_path; +use crate::raw::new_task_join_error; +use crate::raw::oio; +use crate::raw::OpDelete; +use crate::Result; + +use super::core::RedbCore; + +pub struct RedbDeleter { + core: Arc, +} + +impl RedbDeleter { + pub fn new(core: Arc) -> Self { + RedbDeleter { core } + } +} + +impl oio::OneShotDelete for RedbDeleter { + async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> { + let p = build_abs_path(&self.core.root, &path); + let core = self.core.clone(); + + task::spawn_blocking(move || core.delete(&p)) + .await + .map_err(new_task_join_error) + .and_then(|inner_result| inner_result) + } +} + +impl oio::BlockingOneShotDelete for RedbDeleter { + fn blocking_delete_once(&self, path: String, _: OpDelete) -> Result<()> { + let p = build_abs_path(&self.core.root, &path); + + self.core.delete(&p)?; + Ok(()) + } +} diff --git a/core/src/services/redb/docs.md b/core/src/services/redb/docs.md index a964edb28b18..33e4c7e188dd 100644 --- a/core/src/services/redb/docs.md +++ b/core/src/services/redb/docs.md @@ -9,7 +9,7 @@ This service can be used to: - [x] delete - [x] copy - [x] rename -- [ ] ~~list~~ +- [x] list - [ ] ~~presign~~ - [x] blocking diff --git a/core/src/services/redb/error.rs b/core/src/services/redb/error.rs new file mode 100644 index 000000000000..b879a068d4a1 --- /dev/null +++ b/core/src/services/redb/error.rs @@ -0,0 +1,26 @@ +use crate::{Error, ErrorKind}; + +pub fn parse_transaction_error(e: redb::TransactionError) -> Error { + Error::new(ErrorKind::Unexpected, "error from redb").set_source(e) +} + +pub fn parse_table_error(e: redb::TableError) -> Error { + match e { + redb::TableError::TableDoesNotExist(_) => { + Error::new(ErrorKind::NotFound, "error from redb").set_source(e) + } + _ => Error::new(ErrorKind::Unexpected, "error from redb").set_source(e), + } +} + +pub fn parse_storage_error(e: redb::StorageError) -> Error { + Error::new(ErrorKind::Unexpected, "error from redb").set_source(e) +} + +pub fn parse_database_error(e: redb::DatabaseError) -> Error { + Error::new(ErrorKind::Unexpected, "error from redb").set_source(e) +} + +pub fn parse_commit_error(e: redb::CommitError) -> Error { + Error::new(ErrorKind::Unexpected, "error from redb").set_source(e) +} diff --git a/core/src/services/redb/lister.rs b/core/src/services/redb/lister.rs new file mode 100644 index 000000000000..aa5c61c35014 --- /dev/null +++ b/core/src/services/redb/lister.rs @@ -0,0 +1,89 @@ +use tokio::task; + +use crate::raw::oio; +use crate::EntryMode; +use crate::Metadata; +use crate::Result; + +use super::error::parse_storage_error; + +#[derive(Debug)] +pub struct RedbLister { + receiver: flume::Receiver>, +} + +impl RedbLister { + pub fn new(mut filter: RedbFilter) -> Self { + let (tx, rx) = flume::bounded(1); + + task::spawn_blocking(move || loop { + let Some(result) = filter.range.next() else { + break; + }; + + let (key, value) = match result { + Ok(pair) => pair, + Err(e) => { + let e = parse_storage_error(e); + if tx.send(Err(e)).is_err() { + break; + } + continue; + } + }; + + let key = key.value(); + let size = value.value().len() as u64; + if key.starts_with(&filter.pattern) { + let mode = EntryMode::from_path(key); + let entry = oio::Entry::new(key, Metadata::new(mode).with_content_length(size)); + if tx.send(Ok(entry)).is_err() { + break; + } + } + }); + + Self { receiver: rx } + } +} + +impl oio::List for RedbLister { + async fn next(&mut self) -> Result> { + match self.receiver.recv_async().await { + Ok(entry) => entry.map(Some), + Err(_) => Ok(None), + } + } +} + +pub struct RedbFilter { + range: redb::Range<'static, &'static str, &'static [u8]>, + pattern: String, +} + +impl RedbFilter { + pub fn new(range: redb::Range<'static, &'static str, &'static [u8]>, pattern: String) -> Self { + Self { range, pattern } + } +} + +impl oio::BlockingList for RedbFilter { + fn next(&mut self) -> Result> { + loop { + let Some(result) = self.range.next() else { + return Ok(None); + }; + + let (key, value) = result.map_err(parse_storage_error)?; + let key = key.value(); + let size = value.value().len() as u64; + if key.starts_with(&self.pattern) { + let mode = EntryMode::from_path(key); + return Ok(Some(oio::Entry::new( + key, + Metadata::new(mode).with_content_length(size), + ))); + } + } + } +} diff --git a/core/src/services/redb/mod.rs b/core/src/services/redb/mod.rs index 1c44bcd062f7..0c6ff4cb7afe 100644 --- a/core/src/services/redb/mod.rs +++ b/core/src/services/redb/mod.rs @@ -15,6 +15,17 @@ // specific language governing permissions and limitations // under the License. +#[cfg(feature = "services-redb")] +mod core; +#[cfg(feature = "services-redb")] +mod deleter; +#[cfg(feature = "services-redb")] +mod error; +#[cfg(feature = "services-redb")] +mod lister; +#[cfg(feature = "services-redb")] +mod writer; + #[cfg(feature = "services-redb")] mod backend; #[cfg(feature = "services-redb")] diff --git a/core/src/services/redb/writer.rs b/core/src/services/redb/writer.rs new file mode 100644 index 000000000000..c168337bc84b --- /dev/null +++ b/core/src/services/redb/writer.rs @@ -0,0 +1,81 @@ +use std::sync::Arc; + +use tokio::task; + +use crate::raw::new_task_join_error; +use crate::raw::oio::{self, QueueBuf}; +use crate::Buffer; +use crate::EntryMode; +use crate::Metadata; +use crate::Result; + +use super::core::RedbCore; + +pub struct RedbWriter { + core: Arc, + path: String, + buffer: QueueBuf, +} + +impl RedbWriter { + pub fn new(core: Arc, path: String) -> Self { + RedbWriter { + core, + path, + buffer: QueueBuf::new(), + } + } +} + +/// # Safety +/// +/// We will only take `&mut Self` reference for KvWriter. +unsafe impl Sync for RedbWriter {} + +impl oio::Write for RedbWriter { + async fn write(&mut self, bs: Buffer) -> Result<()> { + self.buffer.push(bs); + Ok(()) + } + + async fn close(&mut self) -> Result { + let buf = self.buffer.clone().collect(); + + let core = self.core.clone(); + let cloned_path = self.path.clone(); + + task::spawn_blocking(move || { + let value = buf.to_vec(); + core.set(&cloned_path, &value)?; + + let meta = Metadata::new(EntryMode::from_path(&cloned_path)) + .with_content_length(value.len() as _); + Ok(meta) + }) + .await + .map_err(new_task_join_error) + .and_then(|inner_result| inner_result) + } + + async fn abort(&mut self) -> Result<()> { + self.buffer.clear(); + Ok(()) + } +} + +impl oio::BlockingWrite for RedbWriter { + fn write(&mut self, bs: Buffer) -> Result<()> { + self.buffer.push(bs); + Ok(()) + } + + fn close(&mut self) -> Result { + let buf = self.buffer.clone().collect(); + let value = buf.to_vec(); + self.core.set(&self.path, &value)?; + + let meta = + Metadata::new(EntryMode::from_path(&self.path)).with_content_length(value.len() as _); + Ok(meta) + } +}