Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ services-oss = [
services-pcloud = []
services-persy = ["dep:persy", "internal-tokio-rt"]
services-postgresql = ["dep:sqlx", "sqlx?/postgres"]
services-redb = ["dep:redb", "internal-tokio-rt"]
services-redb = ["dep:redb", "internal-tokio-rt", "dep:flume"]
services-redis = ["dep:redis", "dep:bb8", "redis?/tokio-rustls-comp"]
services-redis-native-tls = ["services-redis", "redis?/tokio-native-tls-comp"]
services-rocksdb = ["dep:rocksdb", "internal-tokio-rt"]
Expand Down
233 changes: 92 additions & 141 deletions core/src/services/redb/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
// under the License.

use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;

use tokio::task;

use crate::raw::adapters::kv;
use crate::raw::oio::HierarchyLister;
use crate::raw::*;
use crate::services::RedbConfig;
use crate::Builder;
Expand All @@ -30,6 +29,13 @@ use crate::ErrorKind;
use crate::Scheme;
use crate::*;

use super::core::RedbCore;
use super::deleter::RedbDeleter;
use super::error::*;
use super::lister::RedbFilter;
use super::lister::RedbLister;
use super::writer::RedbWriter;

impl Configurator for RedbConfig {
type Builder = RedbBuilder;
fn into_builder(self) -> Self::Builder {
Expand Down Expand Up @@ -123,192 +129,137 @@ impl Builder for RedbBuilder {
(Some(datadir), db)
};

create_table(&db, &table_name)?;

Ok(RedbBackend::new(Adapter {
let core = RedbCore {
datadir,
table: table_name,
root: self.config.root.unwrap_or_else(|| "/".into()),
db,
})
.with_root(self.config.root.as_deref().unwrap_or_default()))
}
}

/// Backend for Redb services.
pub type RedbBackend = kv::Backend<Adapter>;

#[derive(Clone)]
pub struct Adapter {
datadir: Option<String>,
table: String,
db: Arc<redb::Database>,
}
};
core.create_table()?;

impl Debug for Adapter {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut ds = f.debug_struct("Adapter");
ds.field("path", &self.datadir);
ds.finish()
Ok(RedbBackend { core: core.into() })
}
}

impl kv::Adapter for Adapter {
type Scanner = ();
#[derive(Debug, Clone)]
pub struct RedbBackend {
core: Arc<RedbCore>,
}

fn info(&self) -> kv::Info {
kv::Info::new(
Scheme::Redb,
&self.table,
Capability {
impl Access for RedbBackend {
type Reader = Buffer;
type Writer = RedbWriter;
type Lister = HierarchyLister<RedbLister>;
type Deleter = oio::OneShotDeleter<RedbDeleter>;
type BlockingReader = Buffer;
type BlockingWriter = RedbWriter;
type BlockingLister = HierarchyLister<RedbFilter>;
type BlockingDeleter = oio::OneShotDeleter<RedbDeleter>;

fn info(&self) -> Arc<AccessorInfo> {
let am = AccessorInfo::default();
am.set_scheme(Scheme::Redb)
.set_root(&self.core.root)
.set_name(&self.core.table)
.set_native_capability(Capability {
read: true,
stat: true,

write: true,
write_can_empty: true,
delete: true,

list: true,

blocking: true,
shared: false,
..Default::default()
},
)
}

async fn get(&self, path: &str) -> Result<Option<Buffer>> {
let cloned_self = self.clone();
let cloned_path = path.to_string();

task::spawn_blocking(move || cloned_self.blocking_get(cloned_path.as_str()))
.await
.map_err(new_task_join_error)
.and_then(|inner_result| inner_result)
}

fn blocking_get(&self, path: &str) -> Result<Option<Buffer>> {
let read_txn = self.db.begin_read().map_err(parse_transaction_error)?;

let table_define: redb::TableDefinition<&str, &[u8]> =
redb::TableDefinition::new(&self.table);
});

let table = read_txn
.open_table(table_define)
.map_err(parse_table_error)?;

let result = match table.get(path) {
Ok(Some(v)) => Ok(Some(v.value().to_vec())),
Ok(None) => Ok(None),
Err(e) => Err(parse_storage_error(e)),
}?;
Ok(result.map(Buffer::from))
am.into()
}

async fn set(&self, path: &str, value: Buffer) -> Result<()> {
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
let cloned_self = self.clone();
let cloned_path = path.to_string();

task::spawn_blocking(move || cloned_self.blocking_set(cloned_path.as_str(), value))
task::spawn_blocking(move || cloned_self.blocking_stat(cloned_path.as_str(), args))
.await
.map_err(new_task_join_error)
.and_then(|inner_result| inner_result)
}

fn blocking_set(&self, path: &str, value: Buffer) -> Result<()> {
let write_txn = self.db.begin_write().map_err(parse_transaction_error)?;

let table_define: redb::TableDefinition<&str, &[u8]> =
redb::TableDefinition::new(&self.table);

{
let mut table = write_txn
.open_table(table_define)
.map_err(parse_table_error)?;
fn blocking_stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
let p = build_abs_path(&self.core.root, path);

table
.insert(path, &*value.to_vec())
.map_err(parse_storage_error)?;
if p == build_abs_path(&self.core.root, "") {
Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
} else {
let bs = self.core.get(&p)?;
match bs {
Some(bs) => Ok(RpStat::new(
Metadata::new(EntryMode::from_path(&p)).with_content_length(bs.len() as u64),
)),
None => Err(Error::new(ErrorKind::NotFound, "kv doesn't have this path")),
}
}

write_txn.commit().map_err(parse_commit_error)?;
Ok(())
}

async fn delete(&self, path: &str) -> Result<()> {
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
let cloned_self = self.clone();
let cloned_path = path.to_string();

task::spawn_blocking(move || cloned_self.blocking_delete(cloned_path.as_str()))
task::spawn_blocking(move || cloned_self.blocking_read(cloned_path.as_str(), args))
.await
.map_err(new_task_join_error)
.and_then(|inner_result| inner_result)
}

fn blocking_delete(&self, path: &str) -> Result<()> {
let write_txn = self.db.begin_write().map_err(parse_transaction_error)?;

let table_define: redb::TableDefinition<&str, &[u8]> =
redb::TableDefinition::new(&self.table);

{
let mut table = write_txn
.open_table(table_define)
.map_err(parse_table_error)?;

table.remove(path).map_err(parse_storage_error)?;
}
fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
let p = build_abs_path(&self.core.root, path);
let bs = match self.core.get(&p)? {
Some(bs) => Buffer::from(bs),
None => return Err(Error::new(ErrorKind::NotFound, "kv doesn't have this path")),
};
Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
}

write_txn.commit().map_err(parse_commit_error)?;
Ok(())
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
self.blocking_write(path, args)
}
}

fn parse_transaction_error(e: redb::TransactionError) -> Error {
Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
}
fn blocking_write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
let p = build_abs_path(&self.core.root, path);

fn parse_table_error(e: redb::TableError) -> Error {
match e {
redb::TableError::TableDoesNotExist(_) => {
Error::new(ErrorKind::NotFound, "error from redb").set_source(e)
}
_ => Error::new(ErrorKind::Unexpected, "error from redb").set_source(e),
Ok((RpWrite::new(), RedbWriter::new(self.core.clone(), p)))
}
}

fn parse_storage_error(e: redb::StorageError) -> Error {
Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
}
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
self.blocking_delete()
}

fn parse_database_error(e: redb::DatabaseError) -> Error {
Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
}
fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
Ok((
RpDelete::default(),
oio::OneShotDeleter::new(RedbDeleter::new(self.core.clone())),
))
}

fn parse_commit_error(e: redb::CommitError) -> Error {
Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
let pattern = build_abs_path(&self.core.root, path);
let range = self.core.iter()?;
let lister = RedbLister::new(RedbFilter::new(range, pattern));
let lister = HierarchyLister::new(lister, path, args.recursive());

/// Check if a table exists, otherwise create it.
fn create_table(db: &redb::Database, table: &str) -> Result<()> {
// Only one `WriteTransaction` is permitted at same time,
// applying new one will block until it available.
//
// So we first try checking table existence via `ReadTransaction`.
{
let read_txn = db.begin_read().map_err(parse_transaction_error)?;

let table_define: redb::TableDefinition<&str, &[u8]> = redb::TableDefinition::new(table);

match read_txn.open_table(table_define) {
Ok(_) => return Ok(()),
Err(redb::TableError::TableDoesNotExist(_)) => (),
Err(e) => return Err(parse_table_error(e)),
}
Ok((RpList::default(), lister))
}

{
let write_txn = db.begin_write().map_err(parse_transaction_error)?;
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
let pattern = build_abs_path(&self.core.root, path);
let range = self.core.iter()?;
let lister = RedbFilter::new(range, pattern);
let lister = HierarchyLister::new(lister, path, args.recursive());

let table_define: redb::TableDefinition<&str, &[u8]> = redb::TableDefinition::new(table);

write_txn
.open_table(table_define)
.map_err(parse_table_error)?;
write_txn.commit().map_err(parse_commit_error)?;
Ok((RpList::default(), lister))
}

Ok(())
}
Loading
Loading