Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions google/cloud/storage/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,14 @@ ObjectWriteStream Client::WriteObjectImpl(
error_stream.Close();
return error_stream;
}
auto const& current = google::cloud::internal::CurrentOptions();
auto const buffer_size = request.GetOption<UploadBufferSize>().value_or(
current.get<UploadBufferSizeOption>());
internal::ObjectWriteStreamParams params =
connection_->SetupObjectWriteStream(request).value();
return ObjectWriteStream(std::make_unique<internal::ObjectWriteStreambuf>(
connection_, request, std::move(response->upload_id),
response->committed_size, std::move(response->metadata), buffer_size,
internal::CreateHashFunction(request),
internal::HashValues{
request.GetOption<Crc32cChecksumValue>().value_or(""),
request.GetOption<MD5HashValue>().value_or(""),
},
internal::CreateHashValidator(request),
request.GetOption<AutoFinalize>().value_or(
AutoFinalizeConfig::kEnabled)));
response->committed_size, std::move(response->metadata),
params.buffer_size, std::move(params.hash_function),
std::move(params.known_hashes), std::move(params.hash_validator),
params.auto_finalize));
}

bool Client::UseSimpleUpload(std::string const& file_name, std::size_t& size) {
Expand Down
10 changes: 10 additions & 0 deletions google/cloud/storage/examples/storage_client_mock_samples.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ TEST(StorageMockingSamples, MockWriteObject) {
using gcs::internal::QueryResumableUploadResponse;
EXPECT_CALL(*mock, CreateResumableUpload)
.WillOnce(Return(CreateResumableUploadResponse{"test-only-upload-id"}));
EXPECT_CALL(*mock, SetupObjectWriteStream)
.WillOnce([](gcs::internal::ResumableUploadRequest const&) {
return google::cloud::make_status_or(
gcs::internal::ObjectWriteStreamParams{});
});
EXPECT_CALL(*mock, UploadChunk)
.WillOnce(Return(QueryResumableUploadResponse{
/*.committed_size=*/absl::nullopt,
Expand Down Expand Up @@ -139,6 +144,11 @@ TEST(StorageMockingSamples, MockWriteObjectFailure) {
using gcs::internal::QueryResumableUploadResponse;
EXPECT_CALL(*mock, CreateResumableUpload)
.WillOnce(Return(CreateResumableUploadResponse{"test-only-upload-id"}));
EXPECT_CALL(*mock, SetupObjectWriteStream)
.WillOnce([](gcs::internal::ResumableUploadRequest const&) {
return google::cloud::make_status_or(
gcs::internal::ObjectWriteStreamParams{});
});
EXPECT_CALL(*mock, UploadChunk)
.WillOnce(Return(google::cloud::Status{
google::cloud::StatusCode::kInvalidArgument, "Invalid Argument"}));
Expand Down
17 changes: 17 additions & 0 deletions google/cloud/storage/internal/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,23 @@ StatusOr<ObjectMetadata> StorageConnectionImpl::ExecuteParallelUploadFile(
return res;
}

StatusOr<ObjectWriteStreamParams> StorageConnectionImpl::SetupObjectWriteStream(
ResumableUploadRequest const& request) {
auto const& current = google::cloud::internal::CurrentOptions();
ObjectWriteStreamParams params;
params.buffer_size = request.GetOption<UploadBufferSize>().value_or(
current.get<UploadBufferSizeOption>());
params.hash_function = internal::CreateHashFunction(request);
params.known_hashes = {
request.GetOption<Crc32cChecksumValue>().value_or(""),
request.GetOption<MD5HashValue>().value_or(""),
};
params.hash_validator = internal::CreateHashValidator(request);
params.auto_finalize =
request.GetOption<AutoFinalize>().value_or(AutoFinalizeConfig::kEnabled);
return params;
}

StatusOr<ListBucketAclResponse> StorageConnectionImpl::ListBucketAcl(
ListBucketAclRequest const& request) {
auto const idempotency = current_idempotency_policy().IsIdempotent(request)
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/storage/internal/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ class StorageConnectionImpl
std::vector<std::thread> threads,
std::vector<ParallelUploadFileShard> shards,
bool ignore_cleanup_failures) override;
StatusOr<ObjectWriteStreamParams> SetupObjectWriteStream(
ResumableUploadRequest const& request) override;

StatusOr<ListBucketAclResponse> ListBucketAcl(
ListBucketAclRequest const& request) override;
Expand Down
1 change: 1 addition & 0 deletions google/cloud/storage/internal/object_write_streambuf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "google/cloud/storage/internal/object_write_streambuf.h"
#include "google/cloud/storage/internal/object_requests.h"
#include "google/cloud/storage/internal/storage_connection.h"
#include "google/cloud/storage/version.h"
#include "google/cloud/internal/make_status.h"
#include <algorithm>
Expand Down
18 changes: 17 additions & 1 deletion google/cloud/storage/internal/object_write_streambuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
#include "google/cloud/storage/auto_finalize.h"
#include "google/cloud/storage/internal/hash_function.h"
#include "google/cloud/storage/internal/hash_validator.h"
#include "google/cloud/storage/internal/storage_connection.h"
#include "google/cloud/storage/internal/object_requests.h"
#include "google/cloud/storage/version.h"
#include "google/cloud/options.h"
#include <iostream>
#include <memory>
#include <string>
Expand All @@ -31,6 +32,21 @@ namespace storage {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
class ObjectWriteStream;
namespace internal {
class StorageConnection;

/**
* The parameters to construct an `ObjectWriteStreambuf`.
*
* This is an implementation detail, only used by the library to create the
* streambuf.
*/
struct ObjectWriteStreamParams {
std::size_t buffer_size;
std::unique_ptr<HashFunction> hash_function;
HashValues known_hashes;
std::unique_ptr<HashValidator> hash_validator;
AutoFinalizeConfig auto_finalize;
};

/**
* Defines a compilation barrier for libcurl.
Expand Down
5 changes: 5 additions & 0 deletions google/cloud/storage/internal/storage_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ StatusOr<ObjectMetadata> StorageConnection::ExecuteParallelUploadFile(
"ExecuteParallelUploadFile() is not implemented by this Object");
}

StatusOr<ObjectWriteStreamParams> StorageConnection::SetupObjectWriteStream(
ResumableUploadRequest const&) {
return Status(StatusCode::kUnimplemented, "unimplemented");
}

} // namespace internal
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace storage
Expand Down
3 changes: 3 additions & 0 deletions google/cloud/storage/internal/storage_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "google/cloud/storage/internal/object_acl_requests.h"
#include "google/cloud/storage/internal/object_read_source.h"
#include "google/cloud/storage/internal/object_requests.h"
#include "google/cloud/storage/internal/object_write_streambuf.h"
#include "google/cloud/storage/internal/service_account_requests.h"
#include "google/cloud/storage/internal/sign_blob_requests.h"
#include "google/cloud/storage/oauth2/credentials.h"
Expand Down Expand Up @@ -122,6 +123,8 @@ class StorageConnection {
}
virtual StatusOr<ObjectMetadata> ExecuteParallelUploadFile(
std::vector<std::thread>, std::vector<ParallelUploadFileShard>, bool);
virtual StatusOr<ObjectWriteStreamParams> SetupObjectWriteStream(
ResumableUploadRequest const&);
///@}

///@{
Expand Down
7 changes: 7 additions & 0 deletions google/cloud/storage/internal/storage_connection_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,13 @@ TEST(StorageConnectionTest, ExecuteParallelUploadFileUnimplemented) {
EXPECT_THAT(response, StatusIs(StatusCode::kUnimplemented));
}

TEST(StorageConnectionTest, SetupObjectWriteStreamUnimplemented) {
TestStorageConnection connection;
ResumableUploadRequest request;
auto response = connection.SetupObjectWriteStream(request);
EXPECT_THAT(response, StatusIs(StatusCode::kUnimplemented));
}

} // namespace
} // namespace internal
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down
9 changes: 9 additions & 0 deletions google/cloud/storage/internal/tracing_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,15 @@ StatusOr<storage::ObjectMetadata> TracingConnection::ExecuteParallelUploadFile(
ignore_cleanup_failures));
}

StatusOr<storage::internal::ObjectWriteStreamParams>
TracingConnection::SetupObjectWriteStream(
storage::internal::ResumableUploadRequest const& request) {
auto span =
internal::MakeSpan("storage::Client::WriteObject/SetupObjectWriteStream");
auto scope = opentelemetry::trace::Scope(span);
return internal::EndSpan(*span, impl_->SetupObjectWriteStream(request));
}

StatusOr<storage::internal::ListBucketAclResponse>
TracingConnection::ListBucketAcl(
storage::internal::ListBucketAclRequest const& request) {
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/storage/internal/tracing_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class TracingConnection : public storage::internal::StorageConnection {
std::vector<std::thread> threads,
std::vector<storage::internal::ParallelUploadFileShard> shards,
bool ignore_cleanup_failures) override;
StatusOr<storage::internal::ObjectWriteStreamParams> SetupObjectWriteStream(
storage::internal::ResumableUploadRequest const& request) override;

StatusOr<storage::internal::ListBucketAclResponse> ListBucketAcl(
storage::internal::ListBucketAclRequest const& request) override;
Expand Down
26 changes: 26 additions & 0 deletions google/cloud/storage/internal/tracing_connection_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,32 @@ TEST(TracingClientTest, ExecuteParallelUploadFile) {
OTelAttribute<std::string>("gl-cpp.status_code", code_str)))));
}

TEST(TracingClientTest, SetupObjectWriteStream) {
auto span_catcher = InstallSpanCatcher();
auto mock = std::make_shared<MockClient>();
EXPECT_CALL(*mock, SetupObjectWriteStream).WillOnce([](auto const&) {
EXPECT_TRUE(ThereIsAnActiveSpan());
return PermanentError();
});
auto under_test = TracingConnection(mock);
storage::internal::ResumableUploadRequest request("test-bucket",
"test-object");
auto actual = under_test.SetupObjectWriteStream(request);

auto const code = PermanentError().code();
auto const code_str = StatusCodeToString(code);
auto const msg = PermanentError().message();
EXPECT_THAT(actual, StatusIs(code));
EXPECT_THAT(
span_catcher->GetSpans(),
ElementsAre(AllOf(
SpanNamed("storage::Client::WriteObject/SetupObjectWriteStream"),
SpanHasInstrumentationScope(), SpanKindIsClient(),
SpanWithStatus(opentelemetry::trace::StatusCode::kError, msg),
SpanHasAttributes(
OTelAttribute<std::string>("gl-cpp.status_code", code_str)))));
}

TEST(TracingClientTest, ListBucketAcl) {
auto span_catcher = InstallSpanCatcher();
auto mock = std::make_shared<MockClient>();
Expand Down
3 changes: 3 additions & 0 deletions google/cloud/storage/testing/mock_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ class MockClient : public google::cloud::storage::internal::StorageConnection {
(std::vector<std::thread>,
std::vector<storage::internal::ParallelUploadFileShard>, bool),
(override));
MOCK_METHOD(StatusOr<internal::ObjectWriteStreamParams>,
SetupObjectWriteStream,
(storage::internal::ResumableUploadRequest const&), (override));

MOCK_METHOD(StatusOr<internal::ListBucketAclResponse>, ListBucketAcl,
(internal::ListBucketAclRequest const&), (override));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,16 @@ class ObjectWriteStreambufIntegrationTest
ASSERT_STATUS_OK(create);

auto constexpr kTestUploadBufferSize = 16 * 1024 * 1024L;
internal::ObjectWriteStreamParams params;
params.buffer_size = kTestUploadBufferSize;
params.hash_function = CreateNullHashFunction();
params.hash_validator = CreateNullHashValidator();
params.auto_finalize = AutoFinalizeConfig::kEnabled;
ObjectWriteStream writer(std::make_unique<ObjectWriteStreambuf>(
connection, request, std::move(create->upload_id), /*committed_size=*/0,
/*metadata=*/absl::nullopt, kTestUploadBufferSize,
CreateNullHashFunction(), HashValues{}, CreateNullHashValidator(),
AutoFinalizeConfig::kEnabled));
/*metadata=*/absl::nullopt, params.buffer_size,
std::move(params.hash_function), std::move(params.known_hashes),
std::move(params.hash_validator), params.auto_finalize));

std::ostringstream expected_stream;
WriteRandomLines(writer, expected_stream, line_count, line_size);
Expand Down
Loading