Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion extension/httpfs/include/s3fs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class S3FileHandle : public HTTPFileHandle {

S3AuthParams auth_params;
const S3ConfigParams config_params;
bool initialized_multipart_upload;

public:
void Close() override;
Expand Down Expand Up @@ -202,7 +203,7 @@ class S3FileSystem : public HTTPFileSystem {
string InitializeMultipartUpload(S3FileHandle &file_handle);
void FinalizeMultipartUpload(S3FileHandle &file_handle);

void FlushAllBuffers(S3FileHandle &handle);
bool FlushAllBuffers(S3FileHandle &handle);

void ReadQueryParams(const string &url_query_param, S3AuthParams &params);
static ParsedS3Url S3UrlParse(string url, S3AuthParams &params);
Expand All @@ -213,6 +214,8 @@ class S3FileSystem : public HTTPFileSystem {
// Uploads the contents of write_buffer to S3.
// Note: caller is responsible to not call this method twice on the same buffer
static void UploadBuffer(S3FileHandle &file_handle, shared_ptr<S3WriteBuffer> write_buffer);
static void UploadSingleBuffer(S3FileHandle &file_handle, shared_ptr<S3WriteBuffer> write_buffer);
static void UploadBufferImplementation(S3FileHandle &file_handle, shared_ptr<S3WriteBuffer> write_buffer, string query_param);

vector<OpenFileInfo> Glob(const string &glob_pattern, FileOpener *opener = nullptr) override;
bool ListFiles(const string &directory, const std::function<void(const string &, bool)> &callback,
Expand Down
49 changes: 40 additions & 9 deletions extension/httpfs/s3fs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,8 @@ S3ConfigParams S3ConfigParams::ReadFrom(optional_ptr<FileOpener> opener) {
void S3FileHandle::Close() {
auto &s3fs = (S3FileSystem &)file_system;
if (flags.OpenForWriting() && !upload_finalized) {
s3fs.FlushAllBuffers(*this);
if (parts_uploaded) {
bool did_upload = s3fs.FlushAllBuffers(*this);
if (parts_uploaded && did_upload) {
s3fs.FinalizeMultipartUpload(*this);
}
}
Expand Down Expand Up @@ -336,6 +336,8 @@ string S3FileSystem::InitializeMultipartUpload(S3FileHandle &file_handle) {

open_tag_pos += 10; // Skip open tag

file_handle.initialized_multipart_upload = true;

return result.substr(open_tag_pos, close_tag_pos - open_tag_pos);
}

Expand All @@ -353,10 +355,21 @@ void S3FileSystem::NotifyUploadsInProgress(S3FileHandle &file_handle) {
}

void S3FileSystem::UploadBuffer(S3FileHandle &file_handle, shared_ptr<S3WriteBuffer> write_buffer) {
auto &s3fs = (S3FileSystem &)file_handle.file_system;

string query_param = "partNumber=" + to_string(write_buffer->part_no + 1) + "&" +
"uploadId=" + S3FileSystem::UrlEncode(file_handle.multipart_upload_id, true);

UploadBufferImplementation(file_handle, write_buffer, query_param);

NotifyUploadsInProgress(file_handle);
}

void S3FileSystem::UploadSingleBuffer(S3FileHandle &file_handle, shared_ptr<S3WriteBuffer> write_buffer) {
UploadBufferImplementation(file_handle, write_buffer, "");
}

void S3FileSystem::UploadBufferImplementation(S3FileHandle &file_handle, shared_ptr<S3WriteBuffer> write_buffer, string query_param) {
auto &s3fs = (S3FileSystem &)file_handle.file_system;

unique_ptr<HTTPResponse> res;
string etag;

Expand Down Expand Up @@ -405,7 +418,7 @@ void S3FileSystem::UploadBuffer(S3FileHandle &file_handle, shared_ptr<S3WriteBuf
// Free up space for another thread to acquire an S3WriteBuffer
write_buffer.reset();

NotifyUploadsInProgress(file_handle);
//NotifyUploadsInProgress(file_handle);
}

void S3FileSystem::FlushBuffer(S3FileHandle &file_handle, shared_ptr<S3WriteBuffer> write_buffer) {
Expand Down Expand Up @@ -442,6 +455,9 @@ void S3FileSystem::FlushBuffer(S3FileHandle &file_handle, shared_ptr<S3WriteBuff
#endif
file_handle.uploads_in_progress++;
}
if (file_handle.initialized_multipart_upload == false) {
file_handle.multipart_upload_id = InitializeMultipartUpload(file_handle);
}

#ifdef SAME_THREAD_UPLOAD
UploadBuffer(file_handle, write_buffer);
Expand All @@ -455,7 +471,9 @@ void S3FileSystem::FlushBuffer(S3FileHandle &file_handle, shared_ptr<S3WriteBuff
// Note that FlushAll currently does not allow to continue writing afterwards. Therefore, FinalizeMultipartUpload should
// be called right after it!
// TODO: we can fix this by keeping the last partially written buffer in memory and allow reuploading it with new data.
void S3FileSystem::FlushAllBuffers(S3FileHandle &file_handle) {
bool S3FileSystem::FlushAllBuffers(S3FileHandle &file_handle) {


// Collect references to all buffers to check
vector<shared_ptr<S3WriteBuffer>> to_flush;
file_handle.write_buffers_lock.lock();
Expand All @@ -464,6 +482,16 @@ void S3FileSystem::FlushAllBuffers(S3FileHandle &file_handle) {
}
file_handle.write_buffers_lock.unlock();

if (file_handle.initialized_multipart_upload == false) {

if (to_flush.size() == 1) {
UploadSingleBuffer(file_handle, to_flush[0]);
file_handle.upload_finalized= true;
return false;
} else {
file_handle.multipart_upload_id = InitializeMultipartUpload(file_handle);
}
}
// Flush all buffers that aren't already uploading
for (auto &write_buffer : to_flush) {
if (!write_buffer->uploading) {
Expand All @@ -476,6 +504,7 @@ void S3FileSystem::FlushAllBuffers(S3FileHandle &file_handle) {
#endif

file_handle.RethrowIOError();
return true;
}

void S3FileSystem::FinalizeMultipartUpload(S3FileHandle &file_handle) {
Expand Down Expand Up @@ -895,7 +924,8 @@ void S3FileHandle::Initialize(optional_ptr<FileOpener> opener) {
Storage::DEFAULT_BLOCK_SIZE;
D_ASSERT(part_size * max_part_count >= config_params.max_file_size);

multipart_upload_id = s3fs.InitializeMultipartUpload(*this);
initialized_multipart_upload = false;
//multipart_upload_id = s3fs.InitializeMultipartUpload(*this);
}
}

Expand Down Expand Up @@ -940,8 +970,9 @@ void S3FileSystem::RemoveDirectory(const string &path, optional_ptr<FileOpener>
void S3FileSystem::FileSync(FileHandle &handle) {
auto &s3fh = handle.Cast<S3FileHandle>();
if (!s3fh.upload_finalized) {
FlushAllBuffers(s3fh);
FinalizeMultipartUpload(s3fh);
if (FlushAllBuffers(s3fh)) {
FinalizeMultipartUpload(s3fh);
}
}
}

Expand Down
Loading