diff --git a/crates/core/src/logstore/config.rs b/crates/core/src/logstore/config.rs index 7337d6373b..9335193b57 100644 --- a/crates/core/src/logstore/config.rs +++ b/crates/core/src/logstore/config.rs @@ -6,11 +6,12 @@ //! //! Specific pieces of configuration must implement the `TryUpdateKey` trait which //! defines how to update internal fields based on key-value pairs. -#[cfg(feature = "cloud")] -use ::object_store::RetryConfig; + use object_store::{path::Path, prefix::PrefixStore, ObjectStore}; use std::collections::HashMap; +use std::sync::Arc; +use super::storage::file_cache::{self, FileCacheConfig}; use super::storage::LimitConfig; use super::{storage::runtime::RuntimeConfig, IORuntime}; use crate::{DeltaResult, DeltaTableError}; @@ -103,6 +104,11 @@ pub struct StorageConfig { /// Configuration to limit the number of concurrent requests to the object store. pub limit: Option, + /// File cache configuration. + /// + /// Configuration to enable file cache for the Delta Log Store. + pub file_cache: Option, + /// Properties that are not recognized by the storage configuration. /// /// These properties are ignored by the storage configuration and can be used for custom purposes. @@ -121,12 +127,27 @@ impl StorageConfig { /// Depending on the configuration, the following layers may be added: /// - Retry layer: Adds retry logic to the object store. /// - Limit layer: Limits the number of concurrent requests to the object store. + /// - File cache layer: Adds a file cache to the object store. pub fn decorate_store( &self, store: T, table_root: &url::Url, ) -> DeltaResult> { - let inner = Self::decorate_prefix(store, table_root)?; + let inner = self.decorate_root_store(store)?; + let inner = Self::decorate_prefix(inner, table_root)?; + Ok(inner) + } + + /// Wraps an object store with additional layers of functionality without prefix. + pub(crate) fn decorate_root_store( + &self, + store: T, + ) -> DeltaResult> { + let inner = if let Some(file_cache) = self.file_cache.as_ref() { + file_cache::decorate_store(Arc::new(store), file_cache)? + } else { + Box::new(store) + }; Ok(inner) } @@ -169,7 +190,7 @@ where #[cfg(feature = "cloud")] let remainder = { - let result = ParseResult::::from_iter(remainder); + let result = ParseResult::::from_iter(remainder); config.retry = result.config; result.unparsed }; @@ -225,6 +246,11 @@ impl StorageConfig { remainder }; + let result = ParseResult::::from_iter(remainder); + result.raise_errors()?; + props.file_cache = (!result.is_default).then_some(result.config); + let remainder = result.unparsed; + props.unknown_properties = remainder; Ok(props) } @@ -403,4 +429,36 @@ mod tests { StorageConfig::default().with_io_runtime(IORuntime::Config(RuntimeConfig::default())); assert!(config.runtime.is_some()); } + + #[test] + fn test_file_cache_config_from_options() { + let options = hashmap! { + "file_cache_path".to_string() => "/tmp/file_cache".to_string(), + }; + + let (file_cache_config, remainder): (FileCacheConfig, _) = + super::try_parse_impl(options).unwrap(); + assert!(remainder.is_empty()); + + assert_eq!(file_cache_config.path, "/tmp/file_cache"); + assert!(file_cache_config.last_checkpoint_valid_duration.is_none()); + } + + #[test] + fn test_file_cache_config_from_options_with_last_checkpoint_valid_duration() { + let options = hashmap! { + "file_cache_path".to_string() => "/tmp/file_cache".to_string(), + "file_cache_last_checkpoint_valid_duration".to_string() => "1h".to_string(), + }; + + let (file_cache_config, remainder): (FileCacheConfig, _) = + super::try_parse_impl(options).unwrap(); + assert!(remainder.is_empty()); + + assert_eq!(file_cache_config.path, "/tmp/file_cache"); + assert_eq!( + file_cache_config.last_checkpoint_valid_duration, + Some(Duration::from_secs(3600)) + ); + } } diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index ea2d359979..3ac0d04bf6 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -121,7 +121,8 @@ impl LogStoreFactoryExt for T { location: &Url, options: &StorageConfig, ) -> DeltaResult { - let prefixed_store = options.decorate_store(root_store.clone(), location)?; + let root_store = Arc::new(options.decorate_root_store(root_store)?) as ObjectStoreRef; + let prefixed_store = StorageConfig::decorate_prefix(root_store.clone(), location)?; let log_store = self.with_options(Arc::new(prefixed_store), root_store, location, options)?; Ok(log_store) @@ -956,6 +957,42 @@ pub(crate) mod tests { .try_collect::>() .await } + + #[tokio::test] + async fn test_file_cache_enable_by_storage_option() { + use object_store::memory::InMemory; + use tempfile::tempdir; + let memory_store = Arc::new(InMemory::new()); + let log_path = Path::from("delta-table/_delta_log/00000000000000000001.json"); + + let payload = "test"; + memory_store + .put(&log_path, payload.into()) + .await + .expect("Failed to write log file"); + + // Enable file cache by setting the storage option + let cache_dir = tempdir().unwrap(); + let storge_options = HashMap::from([( + "file_cache_path".to_string(), + cache_dir.path().to_str().unwrap().to_string(), + )]); + + let table_uri = "memory:///delta-table"; + let table = crate::DeltaTableBuilder::from_valid_uri(table_uri) + .unwrap() + .with_storage_backend(memory_store, Url::parse(table_uri).unwrap()) + .with_storage_options(storge_options) + .build() + .unwrap(); + + // Intentially discarding result. We just want to verify cache is enabled + let _ = table.log_store().peek_next_commit(0).await; + + // File should have been cached + let cache_path = cache_dir.path().join(log_path.as_ref()); + assert!(cache_path.exists()); + } } #[cfg(all(test, feature = "datafusion"))] diff --git a/crates/core/src/logstore/storage/file_cache.rs b/crates/core/src/logstore/storage/file_cache.rs new file mode 100644 index 0000000000..4e5a9602a0 --- /dev/null +++ b/crates/core/src/logstore/storage/file_cache.rs @@ -0,0 +1,513 @@ +use deltalake_derive::DeltaConfig; +use std::{fmt::Debug, sync::Arc, time::Duration}; + +use crate::{table::builder::ensure_table_uri, DeltaResult, DeltaTableError}; + +use super::ObjectStoreRef; +use dashmap::DashMap; +use futures::stream::BoxStream; +use object_store::{ + local::LocalFileSystem, path::Path, Error as ObjectStoreError, GetOptions, GetResult, + ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, + PutResult, Result as ObjectStoreResult, +}; +use tokio::sync::Mutex; + +#[async_trait::async_trait] +pub trait FileCachePolicy: Debug + Send + Sync + 'static { + fn should_use_cache(&self, _location: &Path) -> bool { + true + } + + async fn should_update_cache( + &self, + cache_meta: &ObjectMeta, + object_store: ObjectStoreRef, + ) -> ObjectStoreResult { + let meta = object_store.head(&cache_meta.location).await?; + Ok(cache_meta.last_modified < meta.last_modified) + } +} + +#[derive(Debug)] +pub struct DefaultFileCachePolicy {} + +#[async_trait::async_trait] +impl FileCachePolicy for DefaultFileCachePolicy {} + +#[derive(Debug)] +struct DeltaFileCachePolicy { + last_checkpoint_valid_duration: Duration, +} + +impl DeltaFileCachePolicy { + pub fn new(last_checkpoint_valid_duration: Duration) -> Self { + Self { + last_checkpoint_valid_duration, + } + } +} + +#[async_trait::async_trait] +impl FileCachePolicy for DeltaFileCachePolicy { + async fn should_update_cache( + &self, + cache_meta: &ObjectMeta, + object_store: ObjectStoreRef, + ) -> object_store::Result { + let location = &cache_meta.location; + if location.filename() == Some("_last_checkpoint") { + let meta = object_store.head(location).await?; + Ok(cache_meta.last_modified + self.last_checkpoint_valid_duration < meta.last_modified) + } else { + // Other delta files are always immutable, so no need to update cache + Ok(false) + } + } +} + +#[derive(Debug, Clone, Default, DeltaConfig)] +pub struct FileCacheConfig { + /// The path to the file cache. + #[delta(alias = "file_cache_path", env = "FILE_CACHE_PATH")] + pub path: String, + + /// The duration for which the file cache is valid for _last_checkpoint files. + /// If not set, it defaults to zero, which means the cache is not used. + #[delta( + alias = "file_cache_last_checkpoint_valid_duration", + env = "FILE_CACHE_LAST_CHECKPOINT_VALID_DURATION" + )] + pub last_checkpoint_valid_duration: Option, +} + +/// Decorates the given object store with a file cache. +pub fn decorate_store( + store: ObjectStoreRef, + config: &FileCacheConfig, +) -> DeltaResult> { + let location = &config.path; + let path = ensure_table_uri(location)?.to_file_path().map_err(|_| { + DeltaTableError::generic(format!( + "Expected file_cache_path to be a valid file path: {location}", + )) + })?; + + let last_checkpoint_valid_duration = config + .last_checkpoint_valid_duration + .unwrap_or(Duration::ZERO); + + let file_cache = LocalFileSystem::new_with_prefix(path)?; + + Ok(Box::new(FileCacheStorageBackend::try_new( + store, + file_cache, + Arc::new(DeltaFileCachePolicy::new(last_checkpoint_valid_duration)), + )?)) +} + +#[derive(Debug)] +pub struct FileCacheStorageBackend { + inner: ObjectStoreRef, + file_cache: T, + cache_policy: Arc, + // Threads must hold the lock to download the file to cache to prevent + // multiple threads from downloading the same file at the same time. + in_progress_files: Arc>>>, +} + +impl FileCacheStorageBackend { + pub fn try_new( + inner: ObjectStoreRef, + file_cache: T, + cache_policy: Arc, + ) -> ObjectStoreResult { + Ok(Self { + inner, + file_cache, + cache_policy, + in_progress_files: Arc::new(DashMap::new()), + }) + } +} + +impl std::fmt::Display for FileCacheStorageBackend { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "FileCacheStorageBackend {{ inner: {:?}, file_backend: {:?} }}", + self.inner, self.file_cache + ) + } +} + +impl FileCacheStorageBackend { + async fn ensure_cache_populated( + &self, + location: &Path, + options: &GetOptions, + ) -> ObjectStoreResult<()> { + // NOTE: LocalFileSystem has different support for various options, e.g. version. + // I don't think they're used in delta-rs + + let in_progress_file = self + .in_progress_files + .entry(location.to_owned()) + .or_insert_with(|| Arc::new(Mutex::new(()))) + .clone(); + let _guard = in_progress_file.lock().await; + + match self.file_cache.head(location).await { + Ok(meta) => { + if !self + .cache_policy + .should_update_cache(&meta, self.inner.clone()) + .await? + { + tracing::debug!("Cache is up to date: {location:?}"); + + self.in_progress_files.remove(location); + return Ok(()); + } + } + Err(ObjectStoreError::NotFound { .. }) => {} + Err(err) => return Err(err), + } + + tracing::debug!("Downloading file to cache: {location:?}"); + + let options_without_range = GetOptions { + range: None, + ..options.clone() + }; + + let bytes = self + .inner + .get_opts(location, options_without_range) + .await? + .bytes() + .await?; + + self.file_cache + .put(location, PutPayload::from_bytes(bytes)) + .await?; + + tracing::debug!("Finished downloading file to cache: {location:?}"); + + self.in_progress_files.remove(location); + + Ok(()) + } +} + +#[async_trait::async_trait] +impl ObjectStore for FileCacheStorageBackend { + async fn put_opts( + &self, + location: &Path, + bytes: PutPayload, + options: PutOptions, + ) -> ObjectStoreResult { + self.inner.put_opts(location, bytes, options).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + options: PutMultipartOpts, + ) -> ObjectStoreResult> { + self.inner.put_multipart_opts(location, options).await + } + + async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult { + if !self.cache_policy.should_use_cache(location) { + return self.inner.get_opts(location, options).await; + } + + self.ensure_cache_populated(location, &options).await?; + + // NOTE: GetResult also contains meta and other attributes which may be different + // when using a local cache. + self.file_cache.get_opts(location, options).await + } + + async fn head(&self, location: &Path) -> ObjectStoreResult { + self.inner.head(location).await + } + + async fn delete(&self, location: &Path) -> ObjectStoreResult<()> { + self.inner.delete(location).await + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, ObjectStoreResult> { + self.inner.list(prefix) + } + + fn list_with_offset( + &self, + prefix: Option<&Path>, + offset: &Path, + ) -> BoxStream<'static, ObjectStoreResult> { + self.inner.list_with_offset(prefix, offset) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { + self.inner.copy(from, to).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { + self.inner.copy_if_not_exists(from, to).await + } + + async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { + self.inner.rename_if_not_exists(from, to).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::logstore::storage::ObjectStoreRef; + use object_store::memory::InMemory; + use object_store::{path::Path, ObjectMeta, ObjectStore}; + use std::sync::Arc; + + // Helper struct to count the number of times get is called for each file + #[derive(Debug)] + struct CountingObjectStore { + inner: ObjectStoreRef, + get_counts: Arc>, + } + + impl std::fmt::Display for CountingObjectStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "CountingObjectStore {{ get_count: {:?} }}", + self.get_counts + ) + } + } + + #[async_trait::async_trait] + impl ObjectStore for CountingObjectStore { + async fn head(&self, location: &Path) -> ObjectStoreResult { + self.inner.head(location).await + } + + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> ObjectStoreResult { + let mut entry = self.get_counts.entry(location.to_owned()).or_insert(0); + *entry += 1; + + self.inner.get_opts(location, options).await + } + + async fn put_opts( + &self, + location: &Path, + bytes: PutPayload, + options: PutOptions, + ) -> ObjectStoreResult { + self.inner.put_opts(location, bytes, options).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + options: PutMultipartOpts, + ) -> ObjectStoreResult> { + self.inner.put_multipart_opts(location, options).await + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, ObjectStoreResult> { + self.inner.list(prefix) + } + + async fn list_with_delimiter( + &self, + prefix: Option<&Path>, + ) -> ObjectStoreResult { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { + self.inner.copy(from, to).await + } + + async fn delete(&self, location: &Path) -> ObjectStoreResult<()> { + self.inner.delete(location).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { + self.inner.copy_if_not_exists(from, to).await + } + } + + #[tokio::test] + async fn test_file_cached_on_get() { + let inner_store = Arc::new(InMemory::new()); + let cache_store = Arc::new(InMemory::new()) as Arc; + let file_cache_store = Arc::new( + FileCacheStorageBackend::try_new( + inner_store.clone(), + cache_store.clone(), + Arc::new(DefaultFileCachePolicy {}), + ) + .unwrap(), + ); + + let file_path = Path::from("test_file.txt"); + let file_content = b"hello world"; + + // Put file in inner store + inner_store + .put(&file_path, file_content.to_vec().into()) + .await + .unwrap(); + + // Get file, which should cache it + let result = file_cache_store.get(&file_path).await.unwrap(); + let data = result.bytes().await.unwrap(); + assert_eq!(data, file_content.to_vec()); + + // Check if file is in cache, and content is correct + let cache_file_path = cache_store.head(&file_path).await; + assert!(cache_file_path.is_ok()); + + let cached_data = cache_store + .get(&file_path) + .await + .unwrap() + .bytes() + .await + .unwrap(); + assert_eq!(cached_data, file_content.to_vec()); + } + + #[tokio::test] + async fn test_should_only_cache_files_matching_policy() { + let inner_store = Arc::new(CountingObjectStore { + inner: Arc::new(InMemory::new()), + get_counts: Arc::new(DashMap::new()), + }); + let cache_store = Arc::new(InMemory::new()) as Arc; + + #[derive(Debug)] + struct CacheSpecificFilePolicy {} + + #[async_trait::async_trait] + impl FileCachePolicy for CacheSpecificFilePolicy { + fn should_use_cache(&self, location: &Path) -> bool { + location.to_string() == "test_file_cache.txt" + } + } + + let file_cache_store = Arc::new( + FileCacheStorageBackend::try_new( + inner_store.clone(), + cache_store.clone(), + Arc::new(CacheSpecificFilePolicy {}), + ) + .unwrap(), + ); + + let file_path = Path::from("test_file_no_cache.txt"); + let file_path_cached = Path::from("test_file_cache.txt"); + let file_content = b"should not be cached"; + let file_content_cached = b"should be cached"; + + inner_store + .put(&file_path, file_content.to_vec().into()) + .await + .unwrap(); + + inner_store + .put(&file_path_cached, file_content_cached.to_vec().into()) + .await + .unwrap(); + + // Check file that should not be cached + let data = file_cache_store + .get(&file_path) + .await + .unwrap() + .bytes() + .await + .unwrap(); + assert_eq!(data, file_content.to_vec()); + + // Verify inner store was accessed once + assert_eq!(*inner_store.get_counts.get(&file_path).unwrap().value(), 1); + + // Check no file has been saved to cache + let cache_file_path = cache_store.head(&file_path).await; + assert!(matches!( + cache_file_path, + Err(ObjectStoreError::NotFound { .. }) + )); + + // Try accessing it again. + let data = file_cache_store + .get(&file_path) + .await + .unwrap() + .bytes() + .await + .unwrap(); + assert_eq!(data, file_content.to_vec()); + + // Verify inner store was accessed again + assert_eq!(*inner_store.get_counts.get(&file_path).unwrap().value(), 2); + + // Get file that should be cached + let data = file_cache_store + .get(&file_path_cached) + .await + .unwrap() + .bytes() + .await + .unwrap(); + assert_eq!(data, file_content_cached.to_vec()); + + // Verify inner store was accessed once + assert_eq!( + *inner_store + .get_counts + .get(&file_path_cached) + .unwrap() + .value(), + 1 + ); + + let cache_file_path = cache_store.head(&file_path_cached).await; + assert!(cache_file_path.is_ok()); + + // Try accessing it again. + let data = file_cache_store + .get(&file_path_cached) + .await + .unwrap() + .bytes() + .await + .unwrap(); + assert_eq!(data, file_content_cached.to_vec()); + + // Verify inner store was not accessed again + assert_eq!( + *inner_store + .get_counts + .get(&file_path_cached) + .unwrap() + .value(), + 1 + ); + } +} diff --git a/crates/core/src/logstore/storage/mod.rs b/crates/core/src/logstore/storage/mod.rs index 97508e4274..f850eca98a 100644 --- a/crates/core/src/logstore/storage/mod.rs +++ b/crates/core/src/logstore/storage/mod.rs @@ -12,6 +12,7 @@ use deltalake_derive::DeltaConfig; pub use retry_ext::ObjectStoreRetryExt; pub use runtime::{DeltaIOStorageBackend, IORuntime}; +pub(super) mod file_cache; pub(super) mod retry_ext; pub(super) mod runtime; pub(super) mod utils;