Skip to content

Commit a6dbb4e

Browse files
committed
feat: implement object store that caches to disk
Signed-off-by: Peter Ke <[email protected]>
1 parent 6baef44 commit a6dbb4e

File tree

4 files changed

+612
-2
lines changed

4 files changed

+612
-2
lines changed

crates/core/src/logstore/config.rs

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@
77
//! Specific pieces of configuration must implement the `TryUpdateKey` trait which
88
//! defines how to update internal fields based on key-value pairs.
99
use std::collections::HashMap;
10+
use std::sync::Arc;
1011

1112
use ::object_store::RetryConfig;
1213
use object_store::{path::Path, prefix::PrefixStore, ObjectStore};
1314

15+
use super::storage::file_cache::{self, FileCacheConfig};
1416
use super::storage::LimitConfig;
1517
use super::{storage::runtime::RuntimeConfig, IORuntime};
1618
use crate::{DeltaResult, DeltaTableError};
@@ -102,6 +104,11 @@ pub struct StorageConfig {
102104
/// Configuration to limit the number of concurrent requests to the object store.
103105
pub limit: Option<LimitConfig>,
104106

107+
/// File cache configuration.
108+
///
109+
/// Configuration to enable file cache for the Delta Log Store.
110+
pub file_cache: Option<FileCacheConfig>,
111+
105112
/// Properties that are not recognized by the storage configuration.
106113
///
107114
/// These properties are ignored by the storage configuration and can be used for custom purposes.
@@ -120,12 +127,27 @@ impl StorageConfig {
120127
/// Depending on the configuration, the following layers may be added:
121128
/// - Retry layer: Adds retry logic to the object store.
122129
/// - Limit layer: Limits the number of concurrent requests to the object store.
130+
/// - File cache layer: Adds a file cache to the object store.
123131
pub fn decorate_store<T: ObjectStore + Clone>(
124132
&self,
125133
store: T,
126134
table_root: &url::Url,
127135
) -> DeltaResult<Box<dyn ObjectStore>> {
128-
let inner = Self::decorate_prefix(store, table_root)?;
136+
let inner = self.decorate_root_store(store)?;
137+
let inner = Self::decorate_prefix(inner, table_root)?;
138+
Ok(inner)
139+
}
140+
141+
/// Wraps an object store with additional layers of functionality without prefix.
142+
pub(crate) fn decorate_root_store<T: ObjectStore>(
143+
&self,
144+
store: T,
145+
) -> DeltaResult<Box<dyn ObjectStore>> {
146+
let inner = if let Some(file_cache) = self.file_cache.as_ref() {
147+
file_cache::decorate_store(Arc::new(store), file_cache)?
148+
} else {
149+
Box::new(store)
150+
};
129151
Ok(inner)
130152
}
131153

@@ -222,6 +244,11 @@ impl StorageConfig {
222244
remainder
223245
};
224246

247+
let result = ParseResult::<FileCacheConfig>::from_iter(remainder);
248+
result.raise_errors()?;
249+
props.file_cache = (!result.is_default).then_some(result.config);
250+
let remainder = result.unparsed;
251+
225252
props.unknown_properties = remainder;
226253
Ok(props)
227254
}
@@ -396,4 +423,36 @@ mod tests {
396423
StorageConfig::default().with_io_runtime(IORuntime::Config(RuntimeConfig::default()));
397424
assert!(config.runtime.is_some());
398425
}
426+
427+
#[test]
428+
fn test_file_cache_config_from_options() {
429+
let options = hashmap! {
430+
"file_cache_path".to_string() => "/tmp/file_cache".to_string(),
431+
};
432+
433+
let (file_cache_config, remainder): (FileCacheConfig, _) =
434+
super::try_parse_impl(options).unwrap();
435+
assert!(remainder.is_empty());
436+
437+
assert_eq!(file_cache_config.path, "/tmp/file_cache");
438+
assert!(file_cache_config.last_checkpoint_valid_duration.is_none());
439+
}
440+
441+
#[test]
442+
fn test_file_cache_config_from_options_with_last_checkpoint_valid_duration() {
443+
let options = hashmap! {
444+
"file_cache_path".to_string() => "/tmp/file_cache".to_string(),
445+
"file_cache_last_checkpoint_valid_duration".to_string() => "1h".to_string(),
446+
};
447+
448+
let (file_cache_config, remainder): (FileCacheConfig, _) =
449+
super::try_parse_impl(options).unwrap();
450+
assert!(remainder.is_empty());
451+
452+
assert_eq!(file_cache_config.path, "/tmp/file_cache");
453+
assert_eq!(
454+
file_cache_config.last_checkpoint_valid_duration,
455+
Some(Duration::from_secs(3600))
456+
);
457+
}
399458
}

crates/core/src/logstore/mod.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,8 @@ impl<T: LogStoreFactory + ?Sized> LogStoreFactoryExt for T {
121121
location: &Url,
122122
options: &StorageConfig,
123123
) -> DeltaResult<LogStoreRef> {
124-
let prefixed_store = options.decorate_store(root_store.clone(), location)?;
124+
let root_store = Arc::new(options.decorate_root_store(root_store)?) as ObjectStoreRef;
125+
let prefixed_store = StorageConfig::decorate_prefix(root_store.clone(), location)?;
125126
let log_store =
126127
self.with_options(Arc::new(prefixed_store), root_store, location, options)?;
127128
Ok(log_store)
@@ -961,6 +962,42 @@ pub(crate) mod tests {
961962
.try_collect::<Vec<Path>>()
962963
.await
963964
}
965+
966+
#[tokio::test]
967+
async fn test_file_cache_enable_by_storage_option() {
968+
use object_store::memory::InMemory;
969+
use tempfile::tempdir;
970+
let memory_store = Arc::new(InMemory::new());
971+
let log_path = Path::from("delta-table/_delta_log/00000000000000000001.json");
972+
973+
let payload = "test";
974+
memory_store
975+
.put(&log_path, payload.into())
976+
.await
977+
.expect("Failed to write log file");
978+
979+
// Enable file cache by setting the storage option
980+
let cache_dir = tempdir().unwrap();
981+
let storge_options = HashMap::from([(
982+
"file_cache_path".to_string(),
983+
cache_dir.path().to_str().unwrap().to_string(),
984+
)]);
985+
986+
let table_uri = "memory:///delta-table";
987+
let table = crate::DeltaTableBuilder::from_valid_uri(table_uri)
988+
.unwrap()
989+
.with_storage_backend(memory_store, Url::parse(table_uri).unwrap())
990+
.with_storage_options(storge_options)
991+
.build()
992+
.unwrap();
993+
994+
// Intentially discarding result. We just want to verify cache is enabled
995+
let _ = table.log_store().peek_next_commit(0).await;
996+
997+
// File should have been cached
998+
let cache_path = cache_dir.path().join(log_path.as_ref());
999+
assert!(cache_path.exists());
1000+
}
9641001
}
9651002

9661003
#[cfg(all(test, feature = "datafusion"))]

0 commit comments

Comments
 (0)