Skip to content

Commit ad7ba08

Browse files
committed
Implement object store that caches to disk
Signed-off-by: Peter Ke <[email protected]>
1 parent b058e86 commit ad7ba08

File tree

3 files changed

+533
-1
lines changed

3 files changed

+533
-1
lines changed

crates/core/src/logstore/mod.rs

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ use std::cmp::{max, min};
5151
use std::collections::HashMap;
5252
use std::io::{BufRead, BufReader, Cursor};
5353
use std::sync::{Arc, LazyLock};
54+
use std::time::Duration;
5455

5556
use bytes::Bytes;
5657
#[cfg(feature = "datafusion")]
@@ -61,11 +62,13 @@ use delta_kernel::engine::default::executor::tokio::{
6162
use delta_kernel::engine::default::DefaultEngine;
6263
use delta_kernel::{AsAny, Engine};
6364
use futures::{StreamExt, TryStreamExt};
65+
use humantime::parse_duration;
6466
use object_store::{path::Path, Error as ObjectStoreError, ObjectStore};
6567
use regex::Regex;
6668
use serde::de::{Error, SeqAccess, Visitor};
6769
use serde::ser::SerializeSeq;
6870
use serde::{Deserialize, Serialize};
71+
use storage::file_cache::{FileCachePolicy, FileCacheStorageBackend};
6972
use tokio::runtime::RuntimeFlavor;
7073
use tracing::{debug, error, warn};
7174
use url::Url;
@@ -75,6 +78,7 @@ use crate::kernel::log_segment::PathExt;
7578
use crate::kernel::transaction::TransactionError;
7679
use crate::kernel::Action;
7780
use crate::protocol::{get_last_checkpoint, ProtocolError};
81+
use crate::table::builder::ensure_table_uri;
7882
use crate::{DeltaResult, DeltaTableError};
7983

8084
pub use self::config::StorageConfig;
@@ -191,20 +195,80 @@ pub fn logstore_for(location: Url, storage_config: StorageConfig) -> DeltaResult
191195
Err(DeltaTableError::InvalidTableLocation(location.into()))
192196
}
193197

198+
#[derive(Debug)]
199+
struct DeltaFileCachePolicy {
200+
last_checkpoint_valid_duration: Duration,
201+
}
202+
203+
impl DeltaFileCachePolicy {
204+
pub fn new(last_checkpoint_valid_duration: Duration) -> Self {
205+
Self {
206+
last_checkpoint_valid_duration,
207+
}
208+
}
209+
}
210+
211+
#[async_trait::async_trait]
212+
impl FileCachePolicy for DeltaFileCachePolicy {
213+
async fn should_update_cache(
214+
&self,
215+
cache_meta: &object_store::ObjectMeta,
216+
object_store: ObjectStoreRef,
217+
) -> object_store::Result<bool> {
218+
let location = &cache_meta.location;
219+
if location.filename() == Some("_last_checkpoint") {
220+
let meta = object_store.head(location).await?;
221+
Ok(cache_meta.last_modified + self.last_checkpoint_valid_duration < meta.last_modified)
222+
} else {
223+
// Other delta files are always immutable, so no need to update cache
224+
Ok(false)
225+
}
226+
}
227+
}
228+
194229
/// Return the [LogStoreRef] using the given [ObjectStoreRef]
195230
pub fn logstore_with(
196231
root_store: ObjectStoreRef,
197232
location: Url,
198233
storage_config: StorageConfig,
199234
) -> DeltaResult<LogStoreRef> {
235+
let store = if let Some(location) = storage_config.raw.get("file_cache_path") {
236+
let path = ensure_table_uri(location)?.to_file_path().map_err(|_| {
237+
DeltaTableError::generic(format!(
238+
"Expected file_cache_path to be a valid file path: {location}",
239+
))
240+
})?;
241+
242+
let last_checkpoint_valid_duration = if let Some(duration_str) = storage_config
243+
.raw
244+
.get("file_cache_last_checkpoint_valid_duration")
245+
{
246+
parse_duration(duration_str).map_err(|e| {
247+
DeltaTableError::generic(
248+
format!("Failed to parse {duration_str} as Duration: {e}",),
249+
)
250+
})?
251+
} else {
252+
Duration::ZERO
253+
};
254+
255+
Arc::new(FileCacheStorageBackend::try_new(
256+
root_store,
257+
path,
258+
Arc::new(DeltaFileCachePolicy::new(last_checkpoint_valid_duration)),
259+
)?)
260+
} else {
261+
root_store
262+
};
263+
200264
let scheme = Url::parse(&format!("{}://", location.scheme()))
201265
.map_err(|_| DeltaTableError::InvalidTableLocation(location.clone().into()))?;
202266

203267
if let Some(factory) = logstore_factories().get(&scheme) {
204268
debug!("Found a logstore provider for {scheme}");
205269
return factory
206270
.value()
207-
.with_options_internal(root_store, &location, &storage_config);
271+
.with_options_internal(store, &location, &storage_config);
208272
}
209273

210274
error!("Could not find a logstore for the scheme {scheme}");
@@ -913,6 +977,42 @@ pub(crate) mod tests {
913977
.try_collect::<Vec<Path>>()
914978
.await
915979
}
980+
981+
#[tokio::test]
982+
async fn test_file_cache_enable_by_storage_option() {
983+
use object_store::memory::InMemory;
984+
use tempfile::tempdir;
985+
let memory_store = Arc::new(InMemory::new());
986+
let log_path = Path::from("delta-table/_delta_log/00000000000000000001.json");
987+
988+
let payload = "test";
989+
memory_store
990+
.put(&log_path, payload.into())
991+
.await
992+
.expect("Failed to write log file");
993+
994+
// Enable file cache by setting the storage option
995+
let cache_dir = tempdir().unwrap();
996+
let storge_options = HashMap::from([(
997+
"file_cache_path".to_string(),
998+
cache_dir.path().to_str().unwrap().to_string(),
999+
)]);
1000+
1001+
let table_uri = "memory:///delta-table";
1002+
let table = crate::DeltaTableBuilder::from_valid_uri(table_uri)
1003+
.unwrap()
1004+
.with_storage_backend(memory_store, Url::parse(table_uri).unwrap())
1005+
.with_storage_options(storge_options)
1006+
.build()
1007+
.unwrap();
1008+
1009+
// Intentially discarding result. We just want to verify cache is enabled
1010+
let _ = table.log_store().peek_next_commit(0).await;
1011+
1012+
// File should have been cached
1013+
let cache_path = cache_dir.path().join(log_path.as_ref());
1014+
assert!(cache_path.exists());
1015+
}
9161016
}
9171017

9181018
#[cfg(all(test, feature = "datafusion"))]

0 commit comments

Comments
 (0)