Skip to content

Commit 3643c91

Browse files
PeterKeDerrtyler
authored andcommitted
feat: implement object store that caches to disk
Signed-off-by: Peter Ke <[email protected]>
1 parent 6a4f249 commit 3643c91

File tree

4 files changed

+614
-5
lines changed

4 files changed

+614
-5
lines changed

crates/core/src/logstore/config.rs

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@
66
//!
77
//! Specific pieces of configuration must implement the `TryUpdateKey` trait which
88
//! defines how to update internal fields based on key-value pairs.
9-
#[cfg(feature = "cloud")]
10-
use ::object_store::RetryConfig;
9+
1110
use object_store::{path::Path, prefix::PrefixStore, ObjectStore};
1211
use std::collections::HashMap;
12+
use std::sync::Arc;
1313

14+
use super::storage::file_cache::{self, FileCacheConfig};
1415
use super::storage::LimitConfig;
1516
use super::{storage::runtime::RuntimeConfig, IORuntime};
1617
use crate::{DeltaResult, DeltaTableError};
@@ -103,6 +104,11 @@ pub struct StorageConfig {
103104
/// Configuration to limit the number of concurrent requests to the object store.
104105
pub limit: Option<LimitConfig>,
105106

107+
/// File cache configuration.
108+
///
109+
/// Configuration to enable file cache for the Delta Log Store.
110+
pub file_cache: Option<FileCacheConfig>,
111+
106112
/// Properties that are not recognized by the storage configuration.
107113
///
108114
/// These properties are ignored by the storage configuration and can be used for custom purposes.
@@ -121,12 +127,27 @@ impl StorageConfig {
121127
/// Depending on the configuration, the following layers may be added:
122128
/// - Retry layer: Adds retry logic to the object store.
123129
/// - Limit layer: Limits the number of concurrent requests to the object store.
130+
/// - File cache layer: Adds a file cache to the object store.
124131
pub fn decorate_store<T: ObjectStore + Clone>(
125132
&self,
126133
store: T,
127134
table_root: &url::Url,
128135
) -> DeltaResult<Box<dyn ObjectStore>> {
129-
let inner = Self::decorate_prefix(store, table_root)?;
136+
let inner = self.decorate_root_store(store)?;
137+
let inner = Self::decorate_prefix(inner, table_root)?;
138+
Ok(inner)
139+
}
140+
141+
/// Wraps an object store with additional layers of functionality without prefix.
142+
pub(crate) fn decorate_root_store<T: ObjectStore>(
143+
&self,
144+
store: T,
145+
) -> DeltaResult<Box<dyn ObjectStore>> {
146+
let inner = if let Some(file_cache) = self.file_cache.as_ref() {
147+
file_cache::decorate_store(Arc::new(store), file_cache)?
148+
} else {
149+
Box::new(store)
150+
};
130151
Ok(inner)
131152
}
132153

@@ -169,7 +190,7 @@ where
169190

170191
#[cfg(feature = "cloud")]
171192
let remainder = {
172-
let result = ParseResult::<RetryConfig>::from_iter(remainder);
193+
let result = ParseResult::<object_store::RetryConfig>::from_iter(remainder);
173194
config.retry = result.config;
174195
result.unparsed
175196
};
@@ -225,6 +246,11 @@ impl StorageConfig {
225246
remainder
226247
};
227248

249+
let result = ParseResult::<FileCacheConfig>::from_iter(remainder);
250+
result.raise_errors()?;
251+
props.file_cache = (!result.is_default).then_some(result.config);
252+
let remainder = result.unparsed;
253+
228254
props.unknown_properties = remainder;
229255
Ok(props)
230256
}
@@ -403,4 +429,36 @@ mod tests {
403429
StorageConfig::default().with_io_runtime(IORuntime::Config(RuntimeConfig::default()));
404430
assert!(config.runtime.is_some());
405431
}
432+
433+
#[test]
434+
fn test_file_cache_config_from_options() {
435+
let options = hashmap! {
436+
"file_cache_path".to_string() => "/tmp/file_cache".to_string(),
437+
};
438+
439+
let (file_cache_config, remainder): (FileCacheConfig, _) =
440+
super::try_parse_impl(options).unwrap();
441+
assert!(remainder.is_empty());
442+
443+
assert_eq!(file_cache_config.path, "/tmp/file_cache");
444+
assert!(file_cache_config.last_checkpoint_valid_duration.is_none());
445+
}
446+
447+
#[test]
448+
fn test_file_cache_config_from_options_with_last_checkpoint_valid_duration() {
449+
let options = hashmap! {
450+
"file_cache_path".to_string() => "/tmp/file_cache".to_string(),
451+
"file_cache_last_checkpoint_valid_duration".to_string() => "1h".to_string(),
452+
};
453+
454+
let (file_cache_config, remainder): (FileCacheConfig, _) =
455+
super::try_parse_impl(options).unwrap();
456+
assert!(remainder.is_empty());
457+
458+
assert_eq!(file_cache_config.path, "/tmp/file_cache");
459+
assert_eq!(
460+
file_cache_config.last_checkpoint_valid_duration,
461+
Some(Duration::from_secs(3600))
462+
);
463+
}
406464
}

crates/core/src/logstore/mod.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,8 @@ impl<T: LogStoreFactory + ?Sized> LogStoreFactoryExt for T {
121121
location: &Url,
122122
options: &StorageConfig,
123123
) -> DeltaResult<LogStoreRef> {
124-
let prefixed_store = options.decorate_store(root_store.clone(), location)?;
124+
let root_store = Arc::new(options.decorate_root_store(root_store)?) as ObjectStoreRef;
125+
let prefixed_store = StorageConfig::decorate_prefix(root_store.clone(), location)?;
125126
let log_store =
126127
self.with_options(Arc::new(prefixed_store), root_store, location, options)?;
127128
Ok(log_store)
@@ -956,6 +957,42 @@ pub(crate) mod tests {
956957
.try_collect::<Vec<Path>>()
957958
.await
958959
}
960+
961+
#[tokio::test]
962+
async fn test_file_cache_enable_by_storage_option() {
963+
use object_store::memory::InMemory;
964+
use tempfile::tempdir;
965+
let memory_store = Arc::new(InMemory::new());
966+
let log_path = Path::from("delta-table/_delta_log/00000000000000000001.json");
967+
968+
let payload = "test";
969+
memory_store
970+
.put(&log_path, payload.into())
971+
.await
972+
.expect("Failed to write log file");
973+
974+
// Enable file cache by setting the storage option
975+
let cache_dir = tempdir().unwrap();
976+
let storge_options = HashMap::from([(
977+
"file_cache_path".to_string(),
978+
cache_dir.path().to_str().unwrap().to_string(),
979+
)]);
980+
981+
let table_uri = "memory:///delta-table";
982+
let table = crate::DeltaTableBuilder::from_valid_uri(table_uri)
983+
.unwrap()
984+
.with_storage_backend(memory_store, Url::parse(table_uri).unwrap())
985+
.with_storage_options(storge_options)
986+
.build()
987+
.unwrap();
988+
989+
// Intentially discarding result. We just want to verify cache is enabled
990+
let _ = table.log_store().peek_next_commit(0).await;
991+
992+
// File should have been cached
993+
let cache_path = cache_dir.path().join(log_path.as_ref());
994+
assert!(cache_path.exists());
995+
}
959996
}
960997

961998
#[cfg(all(test, feature = "datafusion"))]

0 commit comments

Comments
 (0)