From 438634625ebab6173e66e6e27fcccb0f5ec12a09 Mon Sep 17 00:00:00 2001 From: Carlo Piovesan Date: Mon, 1 Sep 2025 07:09:00 +0200 Subject: [PATCH] When single fragment is to be uploaded, do single PUT --- extension/httpfs/include/s3fs.hpp | 5 +++- extension/httpfs/s3fs.cpp | 49 +++++++++++++++++++++++++------ 2 files changed, 44 insertions(+), 10 deletions(-) diff --git a/extension/httpfs/include/s3fs.hpp b/extension/httpfs/include/s3fs.hpp index 7b9ad8d..6d880d3 100644 --- a/extension/httpfs/include/s3fs.hpp +++ b/extension/httpfs/include/s3fs.hpp @@ -126,6 +126,7 @@ class S3FileHandle : public HTTPFileHandle { S3AuthParams auth_params; const S3ConfigParams config_params; + bool initialized_multipart_upload; public: void Close() override; @@ -202,7 +203,7 @@ class S3FileSystem : public HTTPFileSystem { string InitializeMultipartUpload(S3FileHandle &file_handle); void FinalizeMultipartUpload(S3FileHandle &file_handle); - void FlushAllBuffers(S3FileHandle &handle); + bool FlushAllBuffers(S3FileHandle &handle); void ReadQueryParams(const string &url_query_param, S3AuthParams ¶ms); static ParsedS3Url S3UrlParse(string url, S3AuthParams ¶ms); @@ -213,6 +214,8 @@ class S3FileSystem : public HTTPFileSystem { // Uploads the contents of write_buffer to S3. // Note: caller is responsible to not call this method twice on the same buffer static void UploadBuffer(S3FileHandle &file_handle, shared_ptr write_buffer); + static void UploadSingleBuffer(S3FileHandle &file_handle, shared_ptr write_buffer); + static void UploadBufferImplementation(S3FileHandle &file_handle, shared_ptr write_buffer, string query_param); vector Glob(const string &glob_pattern, FileOpener *opener = nullptr) override; bool ListFiles(const string &directory, const std::function &callback, diff --git a/extension/httpfs/s3fs.cpp b/extension/httpfs/s3fs.cpp index edd5cea..882e10c 100644 --- a/extension/httpfs/s3fs.cpp +++ b/extension/httpfs/s3fs.cpp @@ -299,8 +299,8 @@ S3ConfigParams S3ConfigParams::ReadFrom(optional_ptr opener) { void S3FileHandle::Close() { auto &s3fs = (S3FileSystem &)file_system; if (flags.OpenForWriting() && !upload_finalized) { - s3fs.FlushAllBuffers(*this); - if (parts_uploaded) { + bool did_upload = s3fs.FlushAllBuffers(*this); + if (parts_uploaded && did_upload) { s3fs.FinalizeMultipartUpload(*this); } } @@ -336,6 +336,8 @@ string S3FileSystem::InitializeMultipartUpload(S3FileHandle &file_handle) { open_tag_pos += 10; // Skip open tag + file_handle.initialized_multipart_upload = true; + return result.substr(open_tag_pos, close_tag_pos - open_tag_pos); } @@ -353,10 +355,21 @@ void S3FileSystem::NotifyUploadsInProgress(S3FileHandle &file_handle) { } void S3FileSystem::UploadBuffer(S3FileHandle &file_handle, shared_ptr write_buffer) { - auto &s3fs = (S3FileSystem &)file_handle.file_system; - string query_param = "partNumber=" + to_string(write_buffer->part_no + 1) + "&" + "uploadId=" + S3FileSystem::UrlEncode(file_handle.multipart_upload_id, true); + + UploadBufferImplementation(file_handle, write_buffer, query_param); + + NotifyUploadsInProgress(file_handle); +} + +void S3FileSystem::UploadSingleBuffer(S3FileHandle &file_handle, shared_ptr write_buffer) { + UploadBufferImplementation(file_handle, write_buffer, ""); +} + +void S3FileSystem::UploadBufferImplementation(S3FileHandle &file_handle, shared_ptr write_buffer, string query_param) { + auto &s3fs = (S3FileSystem &)file_handle.file_system; + unique_ptr res; string etag; @@ -405,7 +418,7 @@ void S3FileSystem::UploadBuffer(S3FileHandle &file_handle, shared_ptr write_buffer) { @@ -442,6 +455,9 @@ void S3FileSystem::FlushBuffer(S3FileHandle &file_handle, shared_ptr> to_flush; file_handle.write_buffers_lock.lock(); @@ -464,6 +482,16 @@ void S3FileSystem::FlushAllBuffers(S3FileHandle &file_handle) { } file_handle.write_buffers_lock.unlock(); + if (file_handle.initialized_multipart_upload == false) { + + if (to_flush.size() == 1) { + UploadSingleBuffer(file_handle, to_flush[0]); + file_handle.upload_finalized= true; + return false; + } else { + file_handle.multipart_upload_id = InitializeMultipartUpload(file_handle); + } + } // Flush all buffers that aren't already uploading for (auto &write_buffer : to_flush) { if (!write_buffer->uploading) { @@ -476,6 +504,7 @@ void S3FileSystem::FlushAllBuffers(S3FileHandle &file_handle) { #endif file_handle.RethrowIOError(); + return true; } void S3FileSystem::FinalizeMultipartUpload(S3FileHandle &file_handle) { @@ -895,7 +924,8 @@ void S3FileHandle::Initialize(optional_ptr opener) { Storage::DEFAULT_BLOCK_SIZE; D_ASSERT(part_size * max_part_count >= config_params.max_file_size); - multipart_upload_id = s3fs.InitializeMultipartUpload(*this); + initialized_multipart_upload = false; + //multipart_upload_id = s3fs.InitializeMultipartUpload(*this); } } @@ -940,8 +970,9 @@ void S3FileSystem::RemoveDirectory(const string &path, optional_ptr void S3FileSystem::FileSync(FileHandle &handle) { auto &s3fh = handle.Cast(); if (!s3fh.upload_finalized) { - FlushAllBuffers(s3fh); - FinalizeMultipartUpload(s3fh); + if (FlushAllBuffers(s3fh)) { + FinalizeMultipartUpload(s3fh); + } } }