Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@

<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
Expand All @@ -63,9 +63,9 @@
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bom</artifactId>
<version>1.11.666</version>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.10.56</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand All @@ -83,8 +83,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import java.io.OutputStream;
import java.util.concurrent.BlockingQueue;

import static com.amazonaws.services.s3.internal.Constants.MB;
import static alex.mojaki.s3upload.StreamTransferManager.MB;


/**
* An {@code OutputStream} which packages data written to it into discrete {@link StreamPart}s which can be obtained
Expand Down
96 changes: 51 additions & 45 deletions src/main/java/alex/mojaki/s3upload/StreamTransferManager.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package alex.mojaki.s3upload;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.*;

import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;

import static com.amazonaws.services.s3.internal.Constants.MB;

// @formatter:off
/**
Expand Down Expand Up @@ -81,7 +80,7 @@ public void run() {
* on what this class can accomplish. If order of data is important to you, then either use only one stream or ensure
* that you write at least 5 MB to every stream.
* <p>
* While performing the multipart upload this class will create instances of {@link InitiateMultipartUploadRequest},
* While performing the multipart upload this class will create instances of {@link CreateMultipartUploadRequest},
* {@link UploadPartRequest}, and {@link CompleteMultipartUploadRequest}, fill in the essential details, and send them
* off. If you need to add additional details then override the appropriate {@code customise*Request} methods and
* set the required properties within. Note that if no data is written (i.e. the object body is empty) then a normal (not multipart) upload will be performed and {@code customisePutEmptyObjectRequest} will be called instead.
Expand All @@ -101,15 +100,17 @@ public class StreamTransferManager {

private static final Logger log = LoggerFactory.getLogger(StreamTransferManager.class);

public static final int MB = 1024 * 1024;

protected final String bucketName;
protected final String putKey;
protected final AmazonS3 s3Client;
protected final S3Client s3Client;
protected String uploadId;
protected int numStreams = 1;
protected int numUploadThreads = 1;
protected int queueCapacity = 1;
protected int partSize = 5 * MB;
private final List<PartETag> partETags = Collections.synchronizedList(new ArrayList<PartETag>());
private final List<CompletedPart> partETags = Collections.synchronizedList(new ArrayList<>());
private List<MultiPartOutputStream> multiPartOutputStreams;
private ExecutorServiceResultsHandler<Void> executorServiceResultsHandler;
private BlockingQueue<StreamPart> queue;
Expand All @@ -121,7 +122,7 @@ public class StreamTransferManager {

public StreamTransferManager(String bucketName,
String putKey,
AmazonS3 s3Client) {
S3Client s3Client) {
this.bucketName = bucketName;
this.putKey = putKey;
this.s3Client = s3Client;
Expand Down Expand Up @@ -250,12 +251,12 @@ private void ensureCanSet() {
}

/**
* Deprecated constructor kept for backward compatibility. Use {@link StreamTransferManager#StreamTransferManager(String, String, AmazonS3)} and then chain the desired setters.
* Deprecated constructor kept for backward compatibility. Use {@link StreamTransferManager#StreamTransferManager(String, String, S3Client)} and then chain the desired setters.
*/
@Deprecated
public StreamTransferManager(String bucketName,
String putKey,
AmazonS3 s3Client,
S3Client s3Client,
int numStreams,
int numUploadThreads,
int queueCapacity,
Expand All @@ -278,15 +279,15 @@ public List<MultiPartOutputStream> getMultiPartOutputStreams() {
return multiPartOutputStreams;
}

queue = new ArrayBlockingQueue<StreamPart>(queueCapacity);
queue = new ArrayBlockingQueue<>(queueCapacity);
log.debug("Initiating multipart upload to {}/{}", bucketName, putKey);
InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName, putKey);
customiseInitiateRequest(initRequest);
InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest);
uploadId = initResponse.getUploadId();
CreateMultipartUploadRequest initRequest = CreateMultipartUploadRequest.builder()
.bucket(bucketName).key(putKey).applyMutation(this::customiseInitiateRequest).build();
CreateMultipartUploadResponse initResponse = s3Client.createMultipartUpload(initRequest);
uploadId = initResponse.uploadId();
log.info("Initiated multipart upload to {}/{} with full ID {}", bucketName, putKey, uploadId);
try {
multiPartOutputStreams = new ArrayList<MultiPartOutputStream>();
multiPartOutputStreams = new ArrayList<>();
ExecutorService threadPool = Executors.newFixedThreadPool(numUploadThreads);

int partNumberStart = 1;
Expand All @@ -298,7 +299,7 @@ public List<MultiPartOutputStream> getMultiPartOutputStreams() {
multiPartOutputStreams.add(multiPartOutputStream);
}

executorServiceResultsHandler = new ExecutorServiceResultsHandler<Void>(threadPool);
executorServiceResultsHandler = new ExecutorServiceResultsHandler<>(threadPool);
for (int i = 0; i < numUploadThreads; i++) {
executorServiceResultsHandler.submit(new UploadTask());
}
Expand Down Expand Up @@ -329,20 +330,21 @@ public void complete() {
log.debug("{}: Completing", this);
if (partETags.isEmpty()) {
log.debug("{}: Uploading empty stream", this);
ByteArrayInputStream emptyStream = new ByteArrayInputStream(new byte[]{});
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(0);
PutObjectRequest request = new PutObjectRequest(bucketName, putKey, emptyStream, metadata);
customisePutEmptyObjectRequest(request);
s3Client.putObject(request);
PutObjectRequest request = PutObjectRequest.builder()
.bucket(bucketName)
.key(putKey)
.contentLength(0L)
.applyMutation(this::customisePutEmptyObjectRequest)
.build();
s3Client.putObject(request, RequestBody.empty());
} else {
CompleteMultipartUploadRequest completeRequest = new
CompleteMultipartUploadRequest(
bucketName,
putKey,
uploadId,
partETags);
customiseCompleteRequest(completeRequest);
CompleteMultipartUploadRequest completeRequest = CompleteMultipartUploadRequest.builder()
.bucket(bucketName)
.key(putKey)
.uploadId(uploadId)
.multipartUpload(b -> b.parts(partETags))
.applyMutation(this::customiseCompleteRequest)
.build();
s3Client.completeMultipartUpload(completeRequest);
}
log.info("{}: Completed", this);
Expand Down Expand Up @@ -387,8 +389,8 @@ public void abort() {
}
if (uploadId != null) {
log.debug("{}: Aborting", this);
AbortMultipartUploadRequest abortMultipartUploadRequest = new AbortMultipartUploadRequest(
bucketName, putKey, uploadId);
AbortMultipartUploadRequest abortMultipartUploadRequest = AbortMultipartUploadRequest.builder()
.bucket(bucketName).key(putKey).uploadId(uploadId).build();
s3Client.abortMultipartUpload(abortMultipartUploadRequest);
log.info("{}: Aborted", this);
}
Expand Down Expand Up @@ -465,15 +467,19 @@ part remaining, which S3 can accept. It is uploaded in the complete() method.
private void uploadStreamPart(StreamPart part) {
log.debug("{}: Uploading {}", this, part);

UploadPartRequest uploadRequest = new UploadPartRequest()
.withBucketName(bucketName).withKey(putKey)
.withUploadId(uploadId).withPartNumber(part.getPartNumber())
.withInputStream(part.getInputStream())
.withPartSize(part.size());
customiseUploadPartRequest(uploadRequest);

UploadPartResult uploadPartResult = s3Client.uploadPart(uploadRequest);
PartETag partETag = uploadPartResult.getPartETag();
UploadPartRequest uploadRequest = UploadPartRequest.builder()
.bucket(bucketName)
.key(putKey)
.uploadId(uploadId)
.partNumber(part.getPartNumber())
.applyMutation(this::customiseUploadPartRequest)
.build();

UploadPartResponse uploadPartResult = s3Client.uploadPart(
uploadRequest,
RequestBody.fromInputStream(part.getInputStream(), part.size()));
CompletedPart partETag = CompletedPart.builder()
.partNumber(part.getPartNumber()).eTag(uploadPartResult.eTag()).build();
partETags.add(partETag);
log.info("{}: Finished uploading {}", this, part);
}
Expand All @@ -487,19 +493,19 @@ public String toString() {
// These methods are intended to be overridden for more specific interactions with the AWS API.

@SuppressWarnings("unused")
public void customiseInitiateRequest(InitiateMultipartUploadRequest request) {
public void customiseInitiateRequest(CreateMultipartUploadRequest.Builder requestBuilder) {
}

@SuppressWarnings("unused")
public void customiseUploadPartRequest(UploadPartRequest request) {
public void customiseUploadPartRequest(UploadPartRequest.Builder requestBuilder) {
}

@SuppressWarnings("unused")
public void customiseCompleteRequest(CompleteMultipartUploadRequest request) {
public void customiseCompleteRequest(CompleteMultipartUploadRequest.Builder requestBuilder) {
}

@SuppressWarnings("unused")
public void customisePutEmptyObjectRequest(PutObjectRequest request) {
public void customisePutEmptyObjectRequest(PutObjectRequest.Builder requestBuilder) {
}

}
Loading