Skip to content

Commit fb41a90

Browse files
committed
feat: implement object store that caches to disk
Signed-off-by: Peter Ke <[email protected]>
1 parent 0b1c615 commit fb41a90

File tree

4 files changed

+603
-2
lines changed

4 files changed

+603
-2
lines changed

crates/core/src/logstore/config.rs

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@
77
//! Specific pieces of configuration must implement the `TryUpdateKey` trait which
88
//! defines how to update internal fields based on key-value pairs.
99
use std::collections::HashMap;
10+
use std::sync::Arc;
1011

1112
use ::object_store::RetryConfig;
1213
use object_store::{path::Path, prefix::PrefixStore, ObjectStore, ObjectStoreScheme};
1314

15+
use super::storage::file_cache::{self, FileCacheConfig};
1416
use super::storage::LimitConfig;
1517
use super::{storage::runtime::RuntimeConfig, IORuntime};
1618
use crate::{DeltaResult, DeltaTableError};
@@ -101,6 +103,11 @@ pub struct StorageConfig {
101103
/// Configuration to limit the number of concurrent requests to the object store.
102104
pub limit: Option<LimitConfig>,
103105

106+
/// File cache configuration.
107+
///
108+
/// Configuration to enable file cache for the Delta Log Store.
109+
pub file_cache: Option<FileCacheConfig>,
110+
104111
/// Properties that are not recognized by the storage configuration.
105112
///
106113
/// These properties are ignored by the storage configuration and can be used for custom purposes.
@@ -119,12 +126,27 @@ impl StorageConfig {
119126
/// Depending on the configuration, the following layers may be added:
120127
/// - Retry layer: Adds retry logic to the object store.
121128
/// - Limit layer: Limits the number of concurrent requests to the object store.
129+
/// - File cache layer: Adds a file cache to the object store.
122130
pub fn decorate_store<T: ObjectStore + Clone>(
123131
&self,
124132
store: T,
125133
table_root: &url::Url,
126134
) -> DeltaResult<Box<dyn ObjectStore>> {
127-
let inner = Self::decorate_prefix(store, table_root)?;
135+
let inner = self.decorate_root_store(store)?;
136+
let inner = Self::decorate_prefix(inner, table_root)?;
137+
Ok(inner)
138+
}
139+
140+
/// Wraps an object store with additional layers of functionality without prefix.
141+
pub(crate) fn decorate_root_store<T: ObjectStore>(
142+
&self,
143+
store: T,
144+
) -> DeltaResult<Box<dyn ObjectStore>> {
145+
let inner = if let Some(file_cache) = self.file_cache.as_ref() {
146+
file_cache::decorate_store(Arc::new(store), file_cache)?
147+
} else {
148+
Box::new(store)
149+
};
128150
Ok(inner)
129151
}
130152

@@ -225,6 +247,11 @@ impl StorageConfig {
225247
remainder
226248
};
227249

250+
let result = ParseResult::<FileCacheConfig>::from_iter(remainder);
251+
result.raise_errors()?;
252+
props.file_cache = (!result.is_default).then_some(result.config);
253+
let remainder = result.unparsed;
254+
228255
props.unknown_properties = remainder;
229256
Ok(props)
230257
}
@@ -296,6 +323,7 @@ pub fn str_is_truthy(val: &str) -> bool {
296323

297324
#[cfg(test)]
298325
mod tests {
326+
use super::FileCacheConfig;
299327
use maplit::hashmap;
300328
use object_store::RetryConfig;
301329
use std::time::Duration;
@@ -318,4 +346,36 @@ mod tests {
318346
assert_eq!(retry_config.backoff.max_backoff, Duration::from_secs(3600));
319347
assert_eq!(retry_config.backoff.base, 50_f64);
320348
}
349+
350+
#[test]
351+
fn test_file_cache_config_from_options() {
352+
let options = hashmap! {
353+
"file_cache_path".to_string() => "/tmp/file_cache".to_string(),
354+
};
355+
356+
let (file_cache_config, remainder): (FileCacheConfig, _) =
357+
super::try_parse_impl(options).unwrap();
358+
assert!(remainder.is_empty());
359+
360+
assert_eq!(file_cache_config.path, "/tmp/file_cache");
361+
assert!(file_cache_config.last_checkpoint_valid_duration.is_none());
362+
}
363+
364+
#[test]
365+
fn test_file_cache_config_from_options_with_last_checkpoint_valid_duration() {
366+
let options = hashmap! {
367+
"file_cache_path".to_string() => "/tmp/file_cache".to_string(),
368+
"file_cache_last_checkpoint_valid_duration".to_string() => "1h".to_string(),
369+
};
370+
371+
let (file_cache_config, remainder): (FileCacheConfig, _) =
372+
super::try_parse_impl(options).unwrap();
373+
assert!(remainder.is_empty());
374+
375+
assert_eq!(file_cache_config.path, "/tmp/file_cache");
376+
assert_eq!(
377+
file_cache_config.last_checkpoint_valid_duration,
378+
Some(Duration::from_secs(3600))
379+
);
380+
}
321381
}

crates/core/src/logstore/mod.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,8 @@ impl<T: LogStoreFactory + ?Sized> LogStoreFactoryExt for T {
120120
location: &Url,
121121
options: &StorageConfig,
122122
) -> DeltaResult<LogStoreRef> {
123-
let prefixed_store = options.decorate_store(root_store.clone(), location)?;
123+
let root_store = Arc::new(options.decorate_root_store(root_store)?) as ObjectStoreRef;
124+
let prefixed_store = StorageConfig::decorate_prefix(root_store.clone(), location)?;
124125
let log_store =
125126
self.with_options(Arc::new(prefixed_store), root_store, location, options)?;
126127
Ok(log_store)
@@ -995,6 +996,42 @@ pub(crate) mod tests {
995996
.try_collect::<Vec<Path>>()
996997
.await
997998
}
999+
1000+
#[tokio::test]
1001+
async fn test_file_cache_enable_by_storage_option() {
1002+
use object_store::memory::InMemory;
1003+
use tempfile::tempdir;
1004+
let memory_store = Arc::new(InMemory::new());
1005+
let log_path = Path::from("delta-table/_delta_log/00000000000000000001.json");
1006+
1007+
let payload = "test";
1008+
memory_store
1009+
.put(&log_path, payload.into())
1010+
.await
1011+
.expect("Failed to write log file");
1012+
1013+
// Enable file cache by setting the storage option
1014+
let cache_dir = tempdir().unwrap();
1015+
let storge_options = HashMap::from([(
1016+
"file_cache_path".to_string(),
1017+
cache_dir.path().to_str().unwrap().to_string(),
1018+
)]);
1019+
1020+
let table_uri = "memory:///delta-table";
1021+
let table = crate::DeltaTableBuilder::from_valid_uri(table_uri)
1022+
.unwrap()
1023+
.with_storage_backend(memory_store, Url::parse(table_uri).unwrap())
1024+
.with_storage_options(storge_options)
1025+
.build()
1026+
.unwrap();
1027+
1028+
// Intentially discarding result. We just want to verify cache is enabled
1029+
let _ = table.log_store().peek_next_commit(0).await;
1030+
1031+
// File should have been cached
1032+
let cache_path = cache_dir.path().join(log_path.as_ref());
1033+
assert!(cache_path.exists());
1034+
}
9981035
}
9991036

10001037
#[cfg(all(test, feature = "datafusion"))]

0 commit comments

Comments
 (0)