From 7f0baf553b1b57ae080e574e17af3dd3a88896ee Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Mon, 24 Jun 2024 11:47:56 +0200 Subject: [PATCH 01/16] update --- object_store/src/upload.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index e5f683a034ac..471e4870bae3 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -204,7 +204,7 @@ impl WriteMultipart { } /// Flush final chunk, and await completion of all in-flight requests - pub async fn finish(mut self) -> Result { + pub async fn finish(&mut self) -> Result { if !self.buffer.is_empty() { let part = std::mem::take(&mut self.buffer); self.put_part(part.into()) From 56cb002cdc1fa6115ac62d3c05b3257bb57c0335 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Mon, 24 Jun 2024 12:02:13 +0200 Subject: [PATCH 02/16] another one --- object_store/src/upload.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index 471e4870bae3..74da6480cebc 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -198,7 +198,7 @@ impl WriteMultipart { } /// Abort this upload, attempting to clean up any successfully uploaded parts - pub async fn abort(mut self) -> Result<()> { + pub async fn abort(&mut self) -> Result<()> { self.tasks.shutdown().await; self.upload.abort().await } From 2a031a8b40a41a8cd6ae358b1566492e0cb61883 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Mon, 24 Jun 2024 12:33:04 +0200 Subject: [PATCH 03/16] more update --- object_store/src/buffered.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index c7b71aa1cc2d..0c70db866312 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -438,7 +438,12 @@ impl AsyncWrite for BufWriter { } BufWriterState::Flush(f) => return f.poll_unpin(cx).map_err(std::io::Error::from), BufWriterState::Write(x) => { - let upload = x.take().unwrap(); + let mut upload = x.take().ok_or({ + std::io::Error::new( + ErrorKind::InvalidInput, + "Cannot shutdown a writer that has already been shut down", + ) + })?; self.state = BufWriterState::Flush( async move { upload.finish().await?; From 4ac0bd7865a16dbe2b02b8cdfc9f1c6e405238da Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Mon, 24 Jun 2024 12:40:26 +0200 Subject: [PATCH 04/16] another update --- object_store/src/buffered.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index 0c70db866312..07a08020dbda 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -438,7 +438,7 @@ impl AsyncWrite for BufWriter { } BufWriterState::Flush(f) => return f.poll_unpin(cx).map_err(std::io::Error::from), BufWriterState::Write(x) => { - let mut upload = x.take().ok_or({ + let mut upload = x.take().ok_or_else(|| { std::io::Error::new( ErrorKind::InvalidInput, "Cannot shutdown a writer that has already been shut down", From 9fc37140c099d1d7bd84f169d0eeac44103e8257 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Mon, 24 Jun 2024 13:08:39 +0200 Subject: [PATCH 05/16] debug --- object_store/src/upload.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index 74da6480cebc..3da030e1069b 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -205,13 +205,19 @@ impl WriteMultipart { /// Flush final chunk, and await completion of all in-flight requests pub async fn finish(&mut self) -> Result { + println!("finishing"); if !self.buffer.is_empty() { + println!("finishing -- not empty"); let part = std::mem::take(&mut self.buffer); self.put_part(part.into()) } + println!("finishing -- waiting for capacity"); self.wait_for_capacity(0).await?; - self.upload.complete().await + println!("finishing -- complete"); + let result = self.upload.complete().await; + println!("finishing -- done"); + result } } From 7cdb38fcc5d50f68d7c034d6fedb45f7ecc47df9 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Mon, 24 Jun 2024 14:10:48 +0200 Subject: [PATCH 06/16] debug --- object_store/src/local.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/object_store/src/local.rs b/object_store/src/local.rs index 1ce588af2144..2fbfc1ffa712 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -748,6 +748,7 @@ impl MultipartUpload for LocalUpload { let s = Arc::clone(&self.state); maybe_spawn_blocking(move || { + println!("Writing part: {}", data.content_length()); let mut f = s.file.lock(); let file = f.as_mut().context(AbortedSnafu)?; file.seek(SeekFrom::Start(offset)) @@ -767,6 +768,7 @@ impl MultipartUpload for LocalUpload { let s = Arc::clone(&self.state); maybe_spawn_blocking(move || { // Ensure no inflight writes + println!("Completing"); let f = s.file.lock().take().context(AbortedSnafu)?; std::fs::rename(&src, &s.dest).context(UnableToRenameFileSnafu)?; let metadata = f.metadata().map_err(|e| Error::Metadata { From f836c0a231d04f99a1ff59240d18951aded49fcd Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Mon, 24 Jun 2024 15:54:27 +0200 Subject: [PATCH 07/16] some updates --- object_store/src/local.rs | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/object_store/src/local.rs b/object_store/src/local.rs index 2fbfc1ffa712..067fec01843a 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -29,7 +29,6 @@ use bytes::Bytes; use chrono::{DateTime, Utc}; use futures::{stream::BoxStream, StreamExt}; use futures::{FutureExt, TryStreamExt}; -use parking_lot::Mutex; use snafu::{ensure, OptionExt, ResultExt, Snafu}; use url::Url; use walkdir::{DirEntry, WalkDir}; @@ -724,7 +723,7 @@ struct LocalUpload { #[derive(Debug)] struct UploadState { dest: PathBuf, - file: Mutex>, + file: Arc, } impl LocalUpload { @@ -732,7 +731,7 @@ impl LocalUpload { Self { state: Arc::new(UploadState { dest, - file: Mutex::new(Some(file)), + file: Arc::new(file), }), src: Some(src), offset: 0, @@ -747,15 +746,14 @@ impl MultipartUpload for LocalUpload { self.offset += data.content_length() as u64; let s = Arc::clone(&self.state); + let file = Arc::clone(&s.file); maybe_spawn_blocking(move || { - println!("Writing part: {}", data.content_length()); - let mut f = s.file.lock(); - let file = f.as_mut().context(AbortedSnafu)?; - file.seek(SeekFrom::Start(offset)) + (&*file) + .seek(SeekFrom::Start(offset)) .context(SeekSnafu { path: &s.dest })?; data.iter() - .try_for_each(|x| file.write_all(x)) + .try_for_each(|x| (&*file).write_all(x)) .context(UnableToCopyDataToFileSnafu)?; Ok(()) @@ -766,12 +764,10 @@ impl MultipartUpload for LocalUpload { async fn complete(&mut self) -> Result { let src = self.src.take().context(AbortedSnafu)?; let s = Arc::clone(&self.state); + let file = Arc::clone(&s.file); maybe_spawn_blocking(move || { - // Ensure no inflight writes - println!("Completing"); - let f = s.file.lock().take().context(AbortedSnafu)?; std::fs::rename(&src, &s.dest).context(UnableToRenameFileSnafu)?; - let metadata = f.metadata().map_err(|e| Error::Metadata { + let metadata = file.as_ref().metadata().map_err(|e| Error::Metadata { source: e.into(), path: src.to_string_lossy().to_string(), })?; From 3d6226558a3bf2023aa8f4a187d5624146e87f0a Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Mon, 24 Jun 2024 16:01:58 +0200 Subject: [PATCH 08/16] debug --- object_store/src/local.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/object_store/src/local.rs b/object_store/src/local.rs index 067fec01843a..8c5b3cd2ab44 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -742,18 +742,23 @@ impl LocalUpload { #[async_trait] impl MultipartUpload for LocalUpload { fn put_part(&mut self, data: PutPayload) -> UploadPart { + println!("put_part"); let offset = self.offset; self.offset += data.content_length() as u64; let s = Arc::clone(&self.state); let file = Arc::clone(&s.file); maybe_spawn_blocking(move || { + println!("seek"); (&*file) .seek(SeekFrom::Start(offset)) .context(SeekSnafu { path: &s.dest })?; data.iter() - .try_for_each(|x| (&*file).write_all(x)) + .try_for_each(|x| { + println!("write all"); + (&*file).write_all(x) + }) .context(UnableToCopyDataToFileSnafu)?; Ok(()) @@ -764,14 +769,15 @@ impl MultipartUpload for LocalUpload { async fn complete(&mut self) -> Result { let src = self.src.take().context(AbortedSnafu)?; let s = Arc::clone(&self.state); - let file = Arc::clone(&s.file); maybe_spawn_blocking(move || { + println!("complete"); std::fs::rename(&src, &s.dest).context(UnableToRenameFileSnafu)?; - let metadata = file.as_ref().metadata().map_err(|e| Error::Metadata { + let metadata = s.file.as_ref().metadata().map_err(|e| Error::Metadata { source: e.into(), path: src.to_string_lossy().to_string(), })?; + println!("complete -- done"); Ok(PutResult { e_tag: Some(get_etag(&metadata)), version: None, @@ -783,7 +789,9 @@ impl MultipartUpload for LocalUpload { async fn abort(&mut self) -> Result<()> { let src = self.src.take().context(AbortedSnafu)?; maybe_spawn_blocking(move || { + println!("remove"); std::fs::remove_file(&src).context(UnableToDeleteFileSnafu { path: &src })?; + println!("remove -- done"); Ok(()) }) .await From 95012d6d09d338ed073d6f35bf0e7f1449edf97e Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Mon, 24 Jun 2024 16:10:40 +0200 Subject: [PATCH 09/16] debug --- object_store/src/local.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/object_store/src/local.rs b/object_store/src/local.rs index 8c5b3cd2ab44..9c79122b8196 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -757,7 +757,13 @@ impl MultipartUpload for LocalUpload { data.iter() .try_for_each(|x| { println!("write all"); - (&*file).write_all(x) + match (&*file).write(x) { + Ok(_) => Ok(()), + Err(e) => { + println!("write all error: {:?}", e); + Err(e) + } + } }) .context(UnableToCopyDataToFileSnafu)?; From 4e1cfba5d964f52311db0e96b3883251e8230ed7 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Fri, 28 Jun 2024 11:20:52 +0200 Subject: [PATCH 10/16] cleanup --- object_store/src/upload.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index 3da030e1069b..74da6480cebc 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -205,19 +205,13 @@ impl WriteMultipart { /// Flush final chunk, and await completion of all in-flight requests pub async fn finish(&mut self) -> Result { - println!("finishing"); if !self.buffer.is_empty() { - println!("finishing -- not empty"); let part = std::mem::take(&mut self.buffer); self.put_part(part.into()) } - println!("finishing -- waiting for capacity"); self.wait_for_capacity(0).await?; - println!("finishing -- complete"); - let result = self.upload.complete().await; - println!("finishing -- done"); - result + self.upload.complete().await } } From 843f375d2266b361c24192d42f13e80a3bea6ff8 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Fri, 28 Jun 2024 11:25:08 +0200 Subject: [PATCH 11/16] cleanup --- object_store/src/local.rs | 32 ++++++++++---------------------- 1 file changed, 10 insertions(+), 22 deletions(-) diff --git a/object_store/src/local.rs b/object_store/src/local.rs index 9c79122b8196..1ce588af2144 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -29,6 +29,7 @@ use bytes::Bytes; use chrono::{DateTime, Utc}; use futures::{stream::BoxStream, StreamExt}; use futures::{FutureExt, TryStreamExt}; +use parking_lot::Mutex; use snafu::{ensure, OptionExt, ResultExt, Snafu}; use url::Url; use walkdir::{DirEntry, WalkDir}; @@ -723,7 +724,7 @@ struct LocalUpload { #[derive(Debug)] struct UploadState { dest: PathBuf, - file: Arc, + file: Mutex>, } impl LocalUpload { @@ -731,7 +732,7 @@ impl LocalUpload { Self { state: Arc::new(UploadState { dest, - file: Arc::new(file), + file: Mutex::new(Some(file)), }), src: Some(src), offset: 0, @@ -742,29 +743,18 @@ impl LocalUpload { #[async_trait] impl MultipartUpload for LocalUpload { fn put_part(&mut self, data: PutPayload) -> UploadPart { - println!("put_part"); let offset = self.offset; self.offset += data.content_length() as u64; let s = Arc::clone(&self.state); - let file = Arc::clone(&s.file); maybe_spawn_blocking(move || { - println!("seek"); - (&*file) - .seek(SeekFrom::Start(offset)) + let mut f = s.file.lock(); + let file = f.as_mut().context(AbortedSnafu)?; + file.seek(SeekFrom::Start(offset)) .context(SeekSnafu { path: &s.dest })?; data.iter() - .try_for_each(|x| { - println!("write all"); - match (&*file).write(x) { - Ok(_) => Ok(()), - Err(e) => { - println!("write all error: {:?}", e); - Err(e) - } - } - }) + .try_for_each(|x| file.write_all(x)) .context(UnableToCopyDataToFileSnafu)?; Ok(()) @@ -776,14 +766,14 @@ impl MultipartUpload for LocalUpload { let src = self.src.take().context(AbortedSnafu)?; let s = Arc::clone(&self.state); maybe_spawn_blocking(move || { - println!("complete"); + // Ensure no inflight writes + let f = s.file.lock().take().context(AbortedSnafu)?; std::fs::rename(&src, &s.dest).context(UnableToRenameFileSnafu)?; - let metadata = s.file.as_ref().metadata().map_err(|e| Error::Metadata { + let metadata = f.metadata().map_err(|e| Error::Metadata { source: e.into(), path: src.to_string_lossy().to_string(), })?; - println!("complete -- done"); Ok(PutResult { e_tag: Some(get_etag(&metadata)), version: None, @@ -795,9 +785,7 @@ impl MultipartUpload for LocalUpload { async fn abort(&mut self) -> Result<()> { let src = self.src.take().context(AbortedSnafu)?; maybe_spawn_blocking(move || { - println!("remove"); std::fs::remove_file(&src).context(UnableToDeleteFileSnafu { path: &src })?; - println!("remove -- done"); Ok(()) }) .await From 9b90e47257bcc4f9f20043515125c1f71c461857 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Fri, 28 Jun 2024 11:33:50 +0200 Subject: [PATCH 12/16] simplify --- object_store/src/local.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/object_store/src/local.rs b/object_store/src/local.rs index 1ce588af2144..d3bfab8ededd 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -724,7 +724,7 @@ struct LocalUpload { #[derive(Debug)] struct UploadState { dest: PathBuf, - file: Mutex>, + file: Mutex, } impl LocalUpload { @@ -732,7 +732,7 @@ impl LocalUpload { Self { state: Arc::new(UploadState { dest, - file: Mutex::new(Some(file)), + file: Mutex::new(file), }), src: Some(src), offset: 0, @@ -748,8 +748,7 @@ impl MultipartUpload for LocalUpload { let s = Arc::clone(&self.state); maybe_spawn_blocking(move || { - let mut f = s.file.lock(); - let file = f.as_mut().context(AbortedSnafu)?; + let mut file = s.file.lock(); file.seek(SeekFrom::Start(offset)) .context(SeekSnafu { path: &s.dest })?; @@ -767,9 +766,9 @@ impl MultipartUpload for LocalUpload { let s = Arc::clone(&self.state); maybe_spawn_blocking(move || { // Ensure no inflight writes - let f = s.file.lock().take().context(AbortedSnafu)?; + let file = s.file.lock(); std::fs::rename(&src, &s.dest).context(UnableToRenameFileSnafu)?; - let metadata = f.metadata().map_err(|e| Error::Metadata { + let metadata = file.metadata().map_err(|e| Error::Metadata { source: e.into(), path: src.to_string_lossy().to_string(), })?; From 78618116028d4d5fe75e9ec9d57114db23a58578 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Fri, 28 Jun 2024 14:41:50 +0200 Subject: [PATCH 13/16] address some comments --- object_store/src/upload.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index 74da6480cebc..471e4870bae3 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -198,7 +198,7 @@ impl WriteMultipart { } /// Abort this upload, attempting to clean up any successfully uploaded parts - pub async fn abort(&mut self) -> Result<()> { + pub async fn abort(mut self) -> Result<()> { self.tasks.shutdown().await; self.upload.abort().await } From ca90a752fe205008164664815e572820076a5b38 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Tue, 2 Jul 2024 10:50:53 +0200 Subject: [PATCH 14/16] cleanup on failure --- object_store/src/buffered.rs | 15 +-------------- object_store/src/upload.rs | 18 ++++++++++-------- 2 files changed, 11 insertions(+), 22 deletions(-) diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index 07a08020dbda..5ddc10e0f39d 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -340,19 +340,6 @@ impl BufWriter { }; } } - - /// Abort this writer, cleaning up any partially uploaded state - /// - /// # Panic - /// - /// Panics if this writer has already been shutdown or aborted - pub async fn abort(&mut self) -> crate::Result<()> { - match &mut self.state { - BufWriterState::Buffer(_, _) | BufWriterState::Prepare(_) => Ok(()), - BufWriterState::Flush(_) => panic!("Already shut down"), - BufWriterState::Write(x) => x.take().unwrap().abort().await, - } - } } impl AsyncWrite for BufWriter { @@ -438,7 +425,7 @@ impl AsyncWrite for BufWriter { } BufWriterState::Flush(f) => return f.poll_unpin(cx).map_err(std::io::Error::from), BufWriterState::Write(x) => { - let mut upload = x.take().ok_or_else(|| { + let upload = x.take().ok_or_else(|| { std::io::Error::new( ErrorKind::InvalidInput, "Cannot shutdown a writer that has already been shut down", diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index 471e4870bae3..63176046cd74 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -197,21 +197,23 @@ impl WriteMultipart { self.tasks.spawn(self.upload.put_part(part)); } - /// Abort this upload, attempting to clean up any successfully uploaded parts - pub async fn abort(mut self) -> Result<()> { - self.tasks.shutdown().await; - self.upload.abort().await - } - /// Flush final chunk, and await completion of all in-flight requests - pub async fn finish(&mut self) -> Result { + pub async fn finish(mut self) -> Result { if !self.buffer.is_empty() { let part = std::mem::take(&mut self.buffer); self.put_part(part.into()) } self.wait_for_capacity(0).await?; - self.upload.complete().await + + match self.upload.complete().await { + Err(e) => { + self.tasks.shutdown().await; + self.upload.abort().await?; + Err(e) + } + Ok(result) => Ok(result), + } } } From c119ef13d11bd4f64c6a0d0a06a68af7b9712dc7 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Tue, 2 Jul 2024 11:12:05 +0200 Subject: [PATCH 15/16] restore abort method --- object_store/src/buffered.rs | 13 +++++++++++++ object_store/src/upload.rs | 5 +++++ 2 files changed, 18 insertions(+) diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index 5ddc10e0f39d..fcd7e064e7c1 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -340,6 +340,19 @@ impl BufWriter { }; } } + + /// Abort this writer, cleaning up any partially uploaded state + /// + /// # Panic + /// + /// Panics if this writer has already been shutdown or aborted + pub async fn abort(&mut self) -> crate::Result<()> { + match &mut self.state { + BufWriterState::Buffer(_, _) | BufWriterState::Prepare(_) => Ok(()), + BufWriterState::Flush(_) => panic!("Already shut down"), + BufWriterState::Write(x) => x.take().unwrap().abort().await, + } + } } impl AsyncWrite for BufWriter { diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index 63176046cd74..c05e45c6f054 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -197,6 +197,11 @@ impl WriteMultipart { self.tasks.spawn(self.upload.put_part(part)); } + pub async fn abort(mut self) -> Result<()> { + self.tasks.shutdown().await; + self.upload.abort().await + } + /// Flush final chunk, and await completion of all in-flight requests pub async fn finish(mut self) -> Result { if !self.buffer.is_empty() { From c850560822d2fc353d6d4ce3f3a5096388f75e3d Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Tue, 2 Jul 2024 11:44:42 +0200 Subject: [PATCH 16/16] docs --- object_store/src/upload.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index c05e45c6f054..335d27ddaefb 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -197,6 +197,7 @@ impl WriteMultipart { self.tasks.spawn(self.upload.put_part(part)); } + /// Abort this upload, attempting to clean up any successfully uploaded parts pub async fn abort(mut self) -> Result<()> { self.tasks.shutdown().await; self.upload.abort().await