Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 62 additions & 4 deletions crates/core/src/logstore/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
//!
//! Specific pieces of configuration must implement the `TryUpdateKey` trait which
//! defines how to update internal fields based on key-value pairs.
#[cfg(feature = "cloud")]
use ::object_store::RetryConfig;

use object_store::{path::Path, prefix::PrefixStore, ObjectStore};
use std::collections::HashMap;
use std::sync::Arc;

use super::storage::file_cache::{self, FileCacheConfig};
use super::storage::LimitConfig;
use super::{storage::runtime::RuntimeConfig, IORuntime};
use crate::{DeltaResult, DeltaTableError};
Expand Down Expand Up @@ -103,6 +104,11 @@ pub struct StorageConfig {
/// Configuration to limit the number of concurrent requests to the object store.
pub limit: Option<LimitConfig>,

/// File cache configuration.
///
/// Configuration to enable file cache for the Delta Log Store.
pub file_cache: Option<FileCacheConfig>,

/// Properties that are not recognized by the storage configuration.
///
/// These properties are ignored by the storage configuration and can be used for custom purposes.
Expand All @@ -121,12 +127,27 @@ impl StorageConfig {
/// Depending on the configuration, the following layers may be added:
/// - Retry layer: Adds retry logic to the object store.
/// - Limit layer: Limits the number of concurrent requests to the object store.
/// - File cache layer: Adds a file cache to the object store.
pub fn decorate_store<T: ObjectStore + Clone>(
&self,
store: T,
table_root: &url::Url,
) -> DeltaResult<Box<dyn ObjectStore>> {
let inner = Self::decorate_prefix(store, table_root)?;
let inner = self.decorate_root_store(store)?;
let inner = Self::decorate_prefix(inner, table_root)?;
Ok(inner)
}

/// Wraps an object store with additional layers of functionality without prefix.
pub(crate) fn decorate_root_store<T: ObjectStore>(
&self,
store: T,
) -> DeltaResult<Box<dyn ObjectStore>> {
let inner = if let Some(file_cache) = self.file_cache.as_ref() {
file_cache::decorate_store(Arc::new(store), file_cache)?
} else {
Box::new(store)
};
Ok(inner)
}

Expand Down Expand Up @@ -169,7 +190,7 @@ where

#[cfg(feature = "cloud")]
let remainder = {
let result = ParseResult::<RetryConfig>::from_iter(remainder);
let result = ParseResult::<object_store::RetryConfig>::from_iter(remainder);
config.retry = result.config;
result.unparsed
};
Expand Down Expand Up @@ -225,6 +246,11 @@ impl StorageConfig {
remainder
};

let result = ParseResult::<FileCacheConfig>::from_iter(remainder);
result.raise_errors()?;
props.file_cache = (!result.is_default).then_some(result.config);
let remainder = result.unparsed;

props.unknown_properties = remainder;
Ok(props)
}
Expand Down Expand Up @@ -403,4 +429,36 @@ mod tests {
StorageConfig::default().with_io_runtime(IORuntime::Config(RuntimeConfig::default()));
assert!(config.runtime.is_some());
}

#[test]
fn test_file_cache_config_from_options() {
let options = hashmap! {
"file_cache_path".to_string() => "/tmp/file_cache".to_string(),
};

let (file_cache_config, remainder): (FileCacheConfig, _) =
super::try_parse_impl(options).unwrap();
assert!(remainder.is_empty());

assert_eq!(file_cache_config.path, "/tmp/file_cache");
assert!(file_cache_config.last_checkpoint_valid_duration.is_none());
}

#[test]
fn test_file_cache_config_from_options_with_last_checkpoint_valid_duration() {
let options = hashmap! {
"file_cache_path".to_string() => "/tmp/file_cache".to_string(),
"file_cache_last_checkpoint_valid_duration".to_string() => "1h".to_string(),
};

let (file_cache_config, remainder): (FileCacheConfig, _) =
super::try_parse_impl(options).unwrap();
assert!(remainder.is_empty());

assert_eq!(file_cache_config.path, "/tmp/file_cache");
assert_eq!(
file_cache_config.last_checkpoint_valid_duration,
Some(Duration::from_secs(3600))
);
}
}
39 changes: 38 additions & 1 deletion crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ impl<T: LogStoreFactory + ?Sized> LogStoreFactoryExt for T {
location: &Url,
options: &StorageConfig,
) -> DeltaResult<LogStoreRef> {
let prefixed_store = options.decorate_store(root_store.clone(), location)?;
let root_store = Arc::new(options.decorate_root_store(root_store)?) as ObjectStoreRef;
let prefixed_store = StorageConfig::decorate_prefix(root_store.clone(), location)?;
let log_store =
self.with_options(Arc::new(prefixed_store), root_store, location, options)?;
Ok(log_store)
Expand Down Expand Up @@ -956,6 +957,42 @@ pub(crate) mod tests {
.try_collect::<Vec<Path>>()
.await
}

#[tokio::test]
async fn test_file_cache_enable_by_storage_option() {
use object_store::memory::InMemory;
use tempfile::tempdir;
let memory_store = Arc::new(InMemory::new());
let log_path = Path::from("delta-table/_delta_log/00000000000000000001.json");

let payload = "test";
memory_store
.put(&log_path, payload.into())
.await
.expect("Failed to write log file");

// Enable file cache by setting the storage option
let cache_dir = tempdir().unwrap();
let storge_options = HashMap::from([(
"file_cache_path".to_string(),
cache_dir.path().to_str().unwrap().to_string(),
)]);

let table_uri = "memory:///delta-table";
let table = crate::DeltaTableBuilder::from_valid_uri(table_uri)
.unwrap()
.with_storage_backend(memory_store, Url::parse(table_uri).unwrap())
.with_storage_options(storge_options)
.build()
.unwrap();

// Intentially discarding result. We just want to verify cache is enabled
let _ = table.log_store().peek_next_commit(0).await;

// File should have been cached
let cache_path = cache_dir.path().join(log_path.as_ref());
assert!(cache_path.exists());
}
}

#[cfg(all(test, feature = "datafusion"))]
Expand Down
Loading
Loading