Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 15 additions & 57 deletions src/main/java/com/instaclustr/esop/s3/v2/BaseS3Backuper.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,10 @@
import com.instaclustr.esop.s3.v2.S3ClientsFactory.S3Clients;
import com.instaclustr.threading.Executors;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.core.waiters.WaiterOverrideConfiguration;
import software.amazon.awssdk.core.waiters.WaiterResponse;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesResponse;
import software.amazon.awssdk.services.s3.model.GetObjectTaggingRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.MetadataDirective;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.ObjectAttributes;
Expand All @@ -36,62 +32,52 @@
import software.amazon.awssdk.services.s3.model.Tagging;

import static java.nio.charset.StandardCharsets.UTF_8;
import static software.amazon.awssdk.core.retry.RetryMode.STANDARD;
import static software.amazon.awssdk.core.retry.backoff.BackoffStrategy.defaultStrategy;

public class BaseS3Backuper extends Backuper
{
public class BaseS3Backuper extends Backuper {
private static final Logger logger = LoggerFactory.getLogger(BaseS3Backuper.class);
private ExecutorService executorService;

public final S3Clients s3Clients;
public final BucketService s3BucketService;

public BaseS3Backuper(final S3Clients s3Clients,
final BackupOperationRequest request)
{
final BackupOperationRequest request) {
super(request);
this.s3Clients = s3Clients;
this.executorService = new Executors.FixedTasksExecutorSupplier().get(100);
this.s3BucketService = new BaseS3BucketService(s3Clients);
}

public BaseS3Backuper(final S3Clients s3Clients,
final BackupCommitLogsOperationRequest request)
{
final BackupCommitLogsOperationRequest request) {
super(request);
this.s3Clients = s3Clients;
this.executorService = new Executors.FixedTasksExecutorSupplier().get(100);
this.s3BucketService = new BaseS3BucketService(s3Clients);
}

@Override
public RemoteObjectReference objectKeyToRemoteReference(final Path objectKey)
{
public RemoteObjectReference objectKeyToRemoteReference(final Path objectKey) {
return new S3RemoteObjectReference(objectKey, objectKey.toString());
}

@Override
public RemoteObjectReference objectKeyToNodeAwareRemoteReference(final Path objectKey)
{
public RemoteObjectReference objectKeyToNodeAwareRemoteReference(final Path objectKey) {
return new S3RemoteObjectReference(objectKey, resolveNodeAwareRemotePath(objectKey));
}


@Override
protected void cleanup() throws Exception
{
protected void cleanup() throws Exception {
s3Clients.close();
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
}

@Override
public FreshenResult freshenRemoteObject(ManifestEntry manifestEntry, RemoteObjectReference object) throws Exception
{
public FreshenResult freshenRemoteObject(ManifestEntry manifestEntry, RemoteObjectReference object) {
List<Tag> tags;
try
{
try {
tags = s3Clients.getNonEncryptingClient()
.getObjectTagging(GetObjectTaggingRequest.builder()
.bucket(request.storageLocation.bucket)
Expand Down Expand Up @@ -133,8 +119,7 @@ public FreshenResult freshenRemoteObject(ManifestEntry manifestEntry, RemoteObje
// we want to preserve whatever tags it had
Tagging tagging = taggingBuilder.tagSet(tags).build();

if (!request.skipRefreshing)
{
if (!request.skipRefreshing) {
CopyObjectRequest copyObjectRequest = CopyObjectRequest.builder()
.sourceBucket(request.storageLocation.bucket)
.destinationBucket(request.storageLocation.bucket)
Expand All @@ -161,19 +146,16 @@ public FreshenResult freshenRemoteObject(ManifestEntry manifestEntry, RemoteObje
}

@Override
public void uploadFile(ManifestEntry manifestEntry, InputStream localFileStream, RemoteObjectReference objectReference) throws Exception
{
public void uploadFile(ManifestEntry manifestEntry, InputStream localFileStream, RemoteObjectReference objectReference) {
logger.info("Uploading {}", objectReference.canonicalPath);
s3Clients.getNonEncryptingClient()
.putObject(getPutObjectRequest(objectReference, manifestEntry.size),
RequestBody.fromInputStream(localFileStream, manifestEntry.size));
}

@Override
public void uploadEncryptedFile(ManifestEntry manifestEntry, InputStream localFileStream, RemoteObjectReference objectReference) throws Exception
{
if (!s3Clients.getEncryptingClient().isPresent())
{
public void uploadEncryptedFile(ManifestEntry manifestEntry, InputStream localFileStream, RemoteObjectReference objectReference) {
if (!s3Clients.getEncryptingClient().isPresent()) {
uploadFile(manifestEntry, localFileStream, objectReference);
return;
}
Expand Down Expand Up @@ -204,8 +186,7 @@ public void uploadEncryptedFile(ManifestEntry manifestEntry, InputStream localFi
}

@Override
public void uploadText(String text, RemoteObjectReference objectReference) throws Exception
{
public void uploadText(String text, RemoteObjectReference objectReference) throws Exception {
logger.info("Uploading {}", objectReference.canonicalPath);
byte[] bytes = text.getBytes(UTF_8);

Expand All @@ -215,8 +196,7 @@ public void uploadText(String text, RemoteObjectReference objectReference) throw
}

@Override
public void uploadEncryptedText(String plainText, RemoteObjectReference objectReference) throws Exception
{
public void uploadEncryptedText(String plainText, RemoteObjectReference objectReference) throws Exception {
if (!s3Clients.getEncryptingClient().isPresent()) {
uploadText(plainText, objectReference);
return;
Expand All @@ -230,31 +210,9 @@ public void uploadEncryptedText(String plainText, RemoteObjectReference objectRe
RequestBody.fromBytes(bytes));
}

private void waitForCompletion(RemoteObjectReference objectReference) throws Exception
{
WaiterResponse<HeadObjectResponse> response = s3Clients.getClient()
.waiter()
.waitUntilObjectExists(HeadObjectRequest.builder()
.bucket(request.storageLocation.bucket)
.key(objectReference.canonicalPath)
.build(),
WaiterOverrideConfiguration.builder()
.backoffStrategy(defaultStrategy(STANDARD))
.build());

if (response.matched().exception().isPresent())
{
logger.debug("Failed to upload {}.", objectReference.canonicalPath);
throw new RuntimeException(response.matched().exception().get());
}

logger.info("Finished uploading {}.", objectReference.canonicalPath);
}

private PutObjectRequest getPutObjectRequest(RemoteObjectReference s3RemoteObjectReference,
long unencryptedSize,
Tag... tags)
{
Tag... tags) {
return PutObjectRequest.builder()
.bucket(request.storageLocation.bucket)
.key(s3RemoteObjectReference.canonicalPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,19 @@

import java.net.URI;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.Protocol;
import com.instaclustr.esop.impl.ProxySettings;
import com.instaclustr.esop.s3.S3ConfigurationResolver;
import com.instaclustr.esop.s3.S3ConfigurationResolver.S3Configuration;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.endpoints.Endpoint;
import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.apache.ProxyConfiguration;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.endpoints.S3EndpointParams;
import software.amazon.awssdk.services.s3.endpoints.S3EndpointProvider;
import software.amazon.encryption.s3.S3EncryptionClient;

public class S3ClientsFactory {
Expand Down Expand Up @@ -104,7 +99,7 @@ public S3Client getEncryptingClient(S3Client wrappedClient, String kmsKeyId) {

private S3Client getDefaultS3Client(S3Configuration s3Conf, ProxySettings proxySettings) {
S3ClientBuilder builder = S3Client.builder()
.credentialsProvider(DefaultCredentialsProvider.create());
.credentialsProvider(InstanceProfileCredentialsProvider.create());
if (s3Conf.awsRegion != null)
builder.region(Region.of(s3Conf.awsRegion));

Expand Down