From 8c222fccda171f2ae78dad213bbf2c853ebce334 Mon Sep 17 00:00:00 2001 From: Alex Hall Date: Tue, 28 Jan 2020 11:56:35 +0200 Subject: [PATCH] Upgrade to AWS SDK v2, tests not working --- pom.xml | 14 +-- .../s3upload/MultiPartOutputStream.java | 3 +- .../s3upload/StreamTransferManager.java | 96 ++++++++------- .../test/StreamTransferManagerTest.java | 110 ++++++++---------- 4 files changed, 111 insertions(+), 112 deletions(-) diff --git a/pom.xml b/pom.xml index e2429d8..c8930b6 100644 --- a/pom.xml +++ b/pom.xml @@ -38,8 +38,8 @@ - com.amazonaws - aws-java-sdk-s3 + software.amazon.awssdk + s3 org.slf4j @@ -63,9 +63,9 @@ - com.amazonaws - aws-java-sdk-bom - 1.11.666 + software.amazon.awssdk + bom + 2.10.56 pom import @@ -83,8 +83,8 @@ maven-compiler-plugin 2.3.2 - 1.6 - 1.6 + 8 + 8 diff --git a/src/main/java/alex/mojaki/s3upload/MultiPartOutputStream.java b/src/main/java/alex/mojaki/s3upload/MultiPartOutputStream.java index 0f243aa..8614818 100644 --- a/src/main/java/alex/mojaki/s3upload/MultiPartOutputStream.java +++ b/src/main/java/alex/mojaki/s3upload/MultiPartOutputStream.java @@ -6,7 +6,8 @@ import java.io.OutputStream; import java.util.concurrent.BlockingQueue; -import static com.amazonaws.services.s3.internal.Constants.MB; +import static alex.mojaki.s3upload.StreamTransferManager.MB; + /** * An {@code OutputStream} which packages data written to it into discrete {@link StreamPart}s which can be obtained diff --git a/src/main/java/alex/mojaki/s3upload/StreamTransferManager.java b/src/main/java/alex/mojaki/s3upload/StreamTransferManager.java index 8235ed6..ffb572d 100644 --- a/src/main/java/alex/mojaki/s3upload/StreamTransferManager.java +++ b/src/main/java/alex/mojaki/s3upload/StreamTransferManager.java @@ -1,17 +1,16 @@ package alex.mojaki.s3upload; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.*; -import java.io.ByteArrayInputStream; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.*; -import static com.amazonaws.services.s3.internal.Constants.MB; // @formatter:off /** @@ -81,7 +80,7 @@ public void run() { * on what this class can accomplish. If order of data is important to you, then either use only one stream or ensure * that you write at least 5 MB to every stream. *

- * While performing the multipart upload this class will create instances of {@link InitiateMultipartUploadRequest}, + * While performing the multipart upload this class will create instances of {@link CreateMultipartUploadRequest}, * {@link UploadPartRequest}, and {@link CompleteMultipartUploadRequest}, fill in the essential details, and send them * off. If you need to add additional details then override the appropriate {@code customise*Request} methods and * set the required properties within. Note that if no data is written (i.e. the object body is empty) then a normal (not multipart) upload will be performed and {@code customisePutEmptyObjectRequest} will be called instead. @@ -101,15 +100,17 @@ public class StreamTransferManager { private static final Logger log = LoggerFactory.getLogger(StreamTransferManager.class); + public static final int MB = 1024 * 1024; + protected final String bucketName; protected final String putKey; - protected final AmazonS3 s3Client; + protected final S3Client s3Client; protected String uploadId; protected int numStreams = 1; protected int numUploadThreads = 1; protected int queueCapacity = 1; protected int partSize = 5 * MB; - private final List partETags = Collections.synchronizedList(new ArrayList()); + private final List partETags = Collections.synchronizedList(new ArrayList<>()); private List multiPartOutputStreams; private ExecutorServiceResultsHandler executorServiceResultsHandler; private BlockingQueue queue; @@ -121,7 +122,7 @@ public class StreamTransferManager { public StreamTransferManager(String bucketName, String putKey, - AmazonS3 s3Client) { + S3Client s3Client) { this.bucketName = bucketName; this.putKey = putKey; this.s3Client = s3Client; @@ -250,12 +251,12 @@ private void ensureCanSet() { } /** - * Deprecated constructor kept for backward compatibility. Use {@link StreamTransferManager#StreamTransferManager(String, String, AmazonS3)} and then chain the desired setters. + * Deprecated constructor kept for backward compatibility. Use {@link StreamTransferManager#StreamTransferManager(String, String, S3Client)} and then chain the desired setters. */ @Deprecated public StreamTransferManager(String bucketName, String putKey, - AmazonS3 s3Client, + S3Client s3Client, int numStreams, int numUploadThreads, int queueCapacity, @@ -278,15 +279,15 @@ public List getMultiPartOutputStreams() { return multiPartOutputStreams; } - queue = new ArrayBlockingQueue(queueCapacity); + queue = new ArrayBlockingQueue<>(queueCapacity); log.debug("Initiating multipart upload to {}/{}", bucketName, putKey); - InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName, putKey); - customiseInitiateRequest(initRequest); - InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest); - uploadId = initResponse.getUploadId(); + CreateMultipartUploadRequest initRequest = CreateMultipartUploadRequest.builder() + .bucket(bucketName).key(putKey).applyMutation(this::customiseInitiateRequest).build(); + CreateMultipartUploadResponse initResponse = s3Client.createMultipartUpload(initRequest); + uploadId = initResponse.uploadId(); log.info("Initiated multipart upload to {}/{} with full ID {}", bucketName, putKey, uploadId); try { - multiPartOutputStreams = new ArrayList(); + multiPartOutputStreams = new ArrayList<>(); ExecutorService threadPool = Executors.newFixedThreadPool(numUploadThreads); int partNumberStart = 1; @@ -298,7 +299,7 @@ public List getMultiPartOutputStreams() { multiPartOutputStreams.add(multiPartOutputStream); } - executorServiceResultsHandler = new ExecutorServiceResultsHandler(threadPool); + executorServiceResultsHandler = new ExecutorServiceResultsHandler<>(threadPool); for (int i = 0; i < numUploadThreads; i++) { executorServiceResultsHandler.submit(new UploadTask()); } @@ -329,20 +330,21 @@ public void complete() { log.debug("{}: Completing", this); if (partETags.isEmpty()) { log.debug("{}: Uploading empty stream", this); - ByteArrayInputStream emptyStream = new ByteArrayInputStream(new byte[]{}); - ObjectMetadata metadata = new ObjectMetadata(); - metadata.setContentLength(0); - PutObjectRequest request = new PutObjectRequest(bucketName, putKey, emptyStream, metadata); - customisePutEmptyObjectRequest(request); - s3Client.putObject(request); + PutObjectRequest request = PutObjectRequest.builder() + .bucket(bucketName) + .key(putKey) + .contentLength(0L) + .applyMutation(this::customisePutEmptyObjectRequest) + .build(); + s3Client.putObject(request, RequestBody.empty()); } else { - CompleteMultipartUploadRequest completeRequest = new - CompleteMultipartUploadRequest( - bucketName, - putKey, - uploadId, - partETags); - customiseCompleteRequest(completeRequest); + CompleteMultipartUploadRequest completeRequest = CompleteMultipartUploadRequest.builder() + .bucket(bucketName) + .key(putKey) + .uploadId(uploadId) + .multipartUpload(b -> b.parts(partETags)) + .applyMutation(this::customiseCompleteRequest) + .build(); s3Client.completeMultipartUpload(completeRequest); } log.info("{}: Completed", this); @@ -387,8 +389,8 @@ public void abort() { } if (uploadId != null) { log.debug("{}: Aborting", this); - AbortMultipartUploadRequest abortMultipartUploadRequest = new AbortMultipartUploadRequest( - bucketName, putKey, uploadId); + AbortMultipartUploadRequest abortMultipartUploadRequest = AbortMultipartUploadRequest.builder() + .bucket(bucketName).key(putKey).uploadId(uploadId).build(); s3Client.abortMultipartUpload(abortMultipartUploadRequest); log.info("{}: Aborted", this); } @@ -465,15 +467,19 @@ part remaining, which S3 can accept. It is uploaded in the complete() method. private void uploadStreamPart(StreamPart part) { log.debug("{}: Uploading {}", this, part); - UploadPartRequest uploadRequest = new UploadPartRequest() - .withBucketName(bucketName).withKey(putKey) - .withUploadId(uploadId).withPartNumber(part.getPartNumber()) - .withInputStream(part.getInputStream()) - .withPartSize(part.size()); - customiseUploadPartRequest(uploadRequest); - - UploadPartResult uploadPartResult = s3Client.uploadPart(uploadRequest); - PartETag partETag = uploadPartResult.getPartETag(); + UploadPartRequest uploadRequest = UploadPartRequest.builder() + .bucket(bucketName) + .key(putKey) + .uploadId(uploadId) + .partNumber(part.getPartNumber()) + .applyMutation(this::customiseUploadPartRequest) + .build(); + + UploadPartResponse uploadPartResult = s3Client.uploadPart( + uploadRequest, + RequestBody.fromInputStream(part.getInputStream(), part.size())); + CompletedPart partETag = CompletedPart.builder() + .partNumber(part.getPartNumber()).eTag(uploadPartResult.eTag()).build(); partETags.add(partETag); log.info("{}: Finished uploading {}", this, part); } @@ -487,19 +493,19 @@ public String toString() { // These methods are intended to be overridden for more specific interactions with the AWS API. @SuppressWarnings("unused") - public void customiseInitiateRequest(InitiateMultipartUploadRequest request) { + public void customiseInitiateRequest(CreateMultipartUploadRequest.Builder requestBuilder) { } @SuppressWarnings("unused") - public void customiseUploadPartRequest(UploadPartRequest request) { + public void customiseUploadPartRequest(UploadPartRequest.Builder requestBuilder) { } @SuppressWarnings("unused") - public void customiseCompleteRequest(CompleteMultipartUploadRequest request) { + public void customiseCompleteRequest(CompleteMultipartUploadRequest.Builder requestBuilder) { } @SuppressWarnings("unused") - public void customisePutEmptyObjectRequest(PutObjectRequest request) { + public void customisePutEmptyObjectRequest(PutObjectRequest.Builder requestBuilder) { } } \ No newline at end of file diff --git a/src/test/java/alex/mojaki/s3upload/test/StreamTransferManagerTest.java b/src/test/java/alex/mojaki/s3upload/test/StreamTransferManagerTest.java index 191c65b..1ef639b 100644 --- a/src/test/java/alex/mojaki/s3upload/test/StreamTransferManagerTest.java +++ b/src/test/java/alex/mojaki/s3upload/test/StreamTransferManagerTest.java @@ -2,18 +2,6 @@ import alex.mojaki.s3upload.MultiPartOutputStream; import alex.mojaki.s3upload.StreamTransferManager; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.SDKGlobalConfiguration; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.S3ObjectInputStream; -import com.amazonaws.services.s3.model.UploadPartRequest; -import com.amazonaws.util.AwsHostNameUtils; -import com.amazonaws.util.IOUtils; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.io.Resources; @@ -28,6 +16,16 @@ import org.jclouds.logging.slf4j.config.SLF4JLoggingModule; import org.junit.*; import org.junit.rules.ExpectedException; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.auth.signer.AwsS3V4Signer; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.utils.AttributeMap; +import software.amazon.awssdk.utils.IoUtils; import java.io.InputStream; import java.net.URI; @@ -39,17 +37,13 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import static software.amazon.awssdk.http.SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES; + /** * A WIP test using s3proxy to avoid requiring actually connecting to a real S3 bucket. */ public class StreamTransferManagerTest { - static { - System.setProperty( - SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, - "true"); - } - @Rule public ExpectedException thrown = ExpectedException.none(); @@ -58,17 +52,14 @@ public class StreamTransferManagerTest { private BlobStoreContext context; private String containerName; private String key; - private BasicAWSCredentials awsCreds; + private AwsBasicCredentials awsCreds; @Before public void setUp() throws Exception { Properties s3ProxyProperties = new Properties(); - InputStream is = Resources.asByteSource(Resources.getResource( - "s3proxy.conf")).openStream(); - try { + try (InputStream is = Resources.asByteSource(Resources.getResource( + "s3proxy.conf")).openStream()) { s3ProxyProperties.load(is); - } finally { - is.close(); } String provider = s3ProxyProperties.getProperty( @@ -83,7 +74,8 @@ public void setUp() throws Exception { S3ProxyConstants.PROPERTY_IDENTITY); String s3Credential = s3ProxyProperties.getProperty( S3ProxyConstants.PROPERTY_CREDENTIAL); - awsCreds = new BasicAWSCredentials(s3Identity, s3Credential); + + awsCreds = AwsBasicCredentials.create(s3Identity, s3Credential); s3Endpoint = new URI(s3ProxyProperties.getProperty( S3ProxyConstants.PROPERTY_ENDPOINT)); String keyStorePath = s3ProxyProperties.getProperty( @@ -110,8 +102,7 @@ public void setUp() throws Exception { S3Proxy.Builder s3ProxyBuilder = S3Proxy.builder() .blobStore(blobStore) .endpoint(s3Endpoint); - //noinspection ConstantConditions - if (s3Identity != null || s3Credential != null) { + if (s3Identity != null && s3Credential != null) { s3ProxyBuilder.awsAuthentication(s3Identity, s3Credential); } if (keyStorePath != null || keyStorePassword != null) { @@ -152,52 +143,53 @@ public void testTransferManager() throws Exception { } private void testTransferManager(final int numLines) throws Exception { - AmazonS3 client = AmazonS3ClientBuilder.standard() - .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) - .withClientConfiguration(new ClientConfiguration().withSignerOverride("S3SignerType")) - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(s3Endpoint.toString(), - AwsHostNameUtils.parseRegion(s3Endpoint.toString(), null))) - .enablePathStyleAccess() + S3Client client = S3Client.builder() + .credentialsProvider(StaticCredentialsProvider.create(awsCreds)) + .endpointOverride(s3Endpoint) + .overrideConfiguration(b -> b.putAdvancedOption( + SdkAdvancedClientOption.SIGNER, AwsS3V4Signer.create())) + .httpClient(ApacheHttpClient.builder().buildWithDefaults( + AttributeMap.builder() + .put(TRUST_ALL_CERTIFICATES, Boolean.TRUE) + .build() + )) +// .enablePathStyleAccess() .build(); int numStreams = 2; final StreamTransferManager manager = new StreamTransferManager(containerName, key, client) { - @Override - public void customiseUploadPartRequest(UploadPartRequest request) { - /* - Workaround from https://github.com/andrewgaul/s3proxy/commit/50a302436271ec46ce81a415b4208b9e14fcaca4 - to deal with https://github.com/andrewgaul/s3proxy/issues/80 - */ - ObjectMetadata metadata = new ObjectMetadata(); - metadata.setContentType("application/unknown"); - request.setObjectMetadata(metadata); - } +// @Override +// public void customiseUploadPartRequest(UploadPartRequest request) { +// /* +// Workaround from https://github.com/andrewgaul/s3proxy/commit/50a302436271ec46ce81a415b4208b9e14fcaca4 +// to deal with https://github.com/andrewgaul/s3proxy/issues/80 +// */ +// ObjectMetadata metadata = new ObjectMetadata(); +// metadata.setContentType("application/unknown"); +// request.setObjectMetadata(metadata); +// } }.numStreams(numStreams) .numUploadThreads(2) .queueCapacity(2) .partSize(10); final List streams = manager.getMultiPartOutputStreams(); - List builders = new ArrayList(numStreams); + List builders = new ArrayList<>(numStreams); ExecutorService pool = Executors.newFixedThreadPool(numStreams); for (int i = 0; i < numStreams; i++) { final int streamIndex = i; final StringBuilder builder = new StringBuilder(); builders.add(builder); - Runnable task = new Runnable() { - @Override - public void run() { - MultiPartOutputStream outputStream = streams.get(streamIndex); - for (int lineNum = 0; lineNum < numLines; lineNum++) { - String line = String.format("Stream %d, line %d\n", streamIndex, lineNum); - outputStream.write(line.getBytes()); - builder.append(line); - } - outputStream.close(); + pool.submit(() -> { + MultiPartOutputStream outputStream = streams.get(streamIndex); + for (int lineNum = 0; lineNum < numLines; lineNum++) { + String line = String.format("Stream %d, line %d\n", streamIndex, lineNum); + outputStream.write(line.getBytes()); + builder.append(line); } - }; - pool.submit(task); + outputStream.close(); + }); } pool.shutdown(); pool.awaitTermination(5, TimeUnit.SECONDS); @@ -209,9 +201,9 @@ public void run() { String expectedResult = builders.get(0).toString(); - S3ObjectInputStream objectContent = client.getObject(containerName, key).getObjectContent(); - String result = IOUtils.toString(objectContent); - IOUtils.closeQuietly(objectContent, null); + ResponseInputStream responseInputStream = client.getObject(b -> b.bucket(containerName).key(key)); + String result = IoUtils.toUtf8String(responseInputStream); + IoUtils.closeQuietly(responseInputStream, null); Assert.assertEquals(expectedResult, result); }