Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.salesforce.multicloudj.common.exceptions.InvalidArgumentException;
import com.salesforce.multicloudj.common.exceptions.SubstrateSdkException;
import com.salesforce.multicloudj.common.exceptions.UnknownException;
import com.salesforce.multicloudj.common.retries.RetryConfig;
import lombok.Getter;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
Expand Down Expand Up @@ -68,6 +69,7 @@
import java.io.OutputStream;
import java.net.URL;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
Expand Down Expand Up @@ -531,6 +533,20 @@ private static S3Client buildS3Client(Builder builder) {
if(shouldConfigureHttpClient(builder)) {
b.httpClient(generateHttpClient(builder));
}
if (builder.getRetryConfig() != null) {
// Create a temporary transformer instance for retry strategy conversion
AwsTransformer transformer = builder.getTransformerSupplier().get(builder.getBucket());
b.overrideConfiguration(config -> {
config.retryStrategy(transformer.toAwsRetryStrategy(builder.getRetryConfig()));
// Set API call timeouts if provided
if (builder.getRetryConfig().getAttemptTimeout() != null) {
config.apiCallAttemptTimeout(Duration.ofMillis(builder.getRetryConfig().getAttemptTimeout()));
}
if (builder.getRetryConfig().getTotalTimeout() != null) {
config.apiCallTimeout(Duration.ofMillis(builder.getRetryConfig().getTotalTimeout()));
}
});
}

return b.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import com.salesforce.multicloudj.blob.driver.PresignedUrlRequest;
import com.salesforce.multicloudj.blob.driver.UploadPartResponse;
import com.salesforce.multicloudj.common.exceptions.InvalidArgumentException;
import com.salesforce.multicloudj.common.exceptions.UnSupportedOperationException;
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
import software.amazon.awssdk.services.s3.model.StorageClass;
import com.salesforce.multicloudj.blob.driver.UploadRequest;
import com.salesforce.multicloudj.common.retries.RetryConfig;
import com.salesforce.multicloudj.common.util.HexUtil;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.retries.StandardRetryStrategy;
import software.amazon.awssdk.retries.api.RetryStrategy;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CommonPrefix;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
Expand Down Expand Up @@ -62,6 +64,7 @@

import java.io.InputStream;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
Expand Down Expand Up @@ -476,4 +479,60 @@ public List<BlobIdentifier> toBlobIdentifiers(List<BlobInfo> blobList) {
.map(blob -> new BlobIdentifier(blob.getKey(), null))
.collect(Collectors.toList());
}

/**
* Converts MultiCloudJ RetryConfig to AWS SDK RetryStrategy
*
* @param retryConfig The retry configuration to convert
* @return AWS SDK RetryStrategy
* @throws InvalidArgumentException if retryConfig is null or has invalid values
*/
public RetryStrategy toAwsRetryStrategy(RetryConfig retryConfig) {
if (retryConfig == null) {
throw new InvalidArgumentException("RetryConfig cannot be null");
}
if (retryConfig.getMaxAttempts() != null && retryConfig.getMaxAttempts() <= 0) {
throw new InvalidArgumentException("RetryConfig.maxAttempts must be greater than 0, got: " + retryConfig.getMaxAttempts());
}

StandardRetryStrategy.Builder strategyBuilder = StandardRetryStrategy.builder();

// Only set maxAttempts if provided, otherwise use AWS SDK default
if (retryConfig.getMaxAttempts() != null) {
strategyBuilder.maxAttempts(retryConfig.getMaxAttempts());
}

// If mode is not set, use AWS SDK's default backoff strategy
if (retryConfig.getMode() == null) {
return strategyBuilder.build();
}

// Configure backoff strategy based on mode
if (retryConfig.getMode() == RetryConfig.Mode.EXPONENTIAL) {
if (retryConfig.getInitialDelayMillis() <= 0) {
throw new InvalidArgumentException("RetryConfig.initialDelayMillis must be greater than 0 for EXPONENTIAL mode, got: " + retryConfig.getInitialDelayMillis());
}
if (retryConfig.getMaxDelayMillis() <= 0) {
throw new InvalidArgumentException("RetryConfig.maxDelayMillis must be greater than 0 for EXPONENTIAL mode, got: " + retryConfig.getMaxDelayMillis());
}
strategyBuilder.backoffStrategy(
software.amazon.awssdk.retries.api.BackoffStrategy.exponentialDelay(
Duration.ofMillis(retryConfig.getInitialDelayMillis()),
Duration.ofMillis(retryConfig.getMaxDelayMillis())
)
);
return strategyBuilder.build();
}

// FIXED mode
if (retryConfig.getFixedDelayMillis() <= 0) {
throw new InvalidArgumentException("RetryConfig.fixedDelayMillis must be greater than 0 for FIXED mode, got: " + retryConfig.getFixedDelayMillis());
}
strategyBuilder.backoffStrategy(
software.amazon.awssdk.retries.api.BackoffStrategy.fixedDelay(
Duration.ofMillis(retryConfig.getFixedDelayMillis())
)
);
return strategyBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.salesforce.multicloudj.common.aws.CredentialsProvider;
import com.salesforce.multicloudj.common.exceptions.InvalidArgumentException;
import com.salesforce.multicloudj.common.exceptions.SubstrateSdkException;
import com.salesforce.multicloudj.common.retries.RetryConfig;
import com.salesforce.multicloudj.sts.model.CredentialsOverrider;
import lombok.Getter;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
Expand Down Expand Up @@ -71,6 +72,7 @@
import java.io.OutputStream;
import java.net.URL;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
Expand Down Expand Up @@ -531,6 +533,22 @@ private static void applyCommonConfig(S3AsyncClientBuilder builder, Builder conf
}
builder.multipartConfiguration(configBuilder.build());

// Configure retry strategy if specified
if (config.getRetryConfig() != null) {
// Create a temporary transformer instance for retry strategy conversion
AwsTransformer transformer = config.getTransformerSupplier().get(config.getBucket());
builder.overrideConfiguration(overrideConfig -> {
overrideConfig.retryStrategy(transformer.toAwsRetryStrategy(config.getRetryConfig()));
// Set API call timeouts if provided
if (config.getRetryConfig().getAttemptTimeout() != null) {
overrideConfig.apiCallAttemptTimeout(Duration.ofMillis(config.getRetryConfig().getAttemptTimeout()));
}
if (config.getRetryConfig().getTotalTimeout() != null) {
overrideConfig.apiCallTimeout(Duration.ofMillis(config.getRetryConfig().getTotalTimeout()));
}
});
}

// Configure async configuration if executor service is specified
if (config.getExecutorService() != null) {
builder.asyncConfiguration(ClientAsyncConfiguration.builder()
Expand Down Expand Up @@ -573,6 +591,11 @@ private static void applyCommonConfig(S3CrtAsyncClientBuilder builder, Builder c
builder.minimumPartSizeInBytes(config.getPartBufferSize());
}

// Configure retry policy if specified
if (config.getRetryConfig() != null) {
builder.retryConfiguration(retryConfig -> retryConfig.numRetries(config.getRetryConfig().getMaxAttempts() - 1));
}

// Configure executor service if specified
if (config.getExecutorService() != null) {
builder.futureCompletionExecutor(config.getExecutorService());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.salesforce.multicloudj.blob.driver.UploadResponse;
import com.salesforce.multicloudj.common.exceptions.InvalidArgumentException;
import com.salesforce.multicloudj.common.exceptions.UnknownException;
import com.salesforce.multicloudj.common.retries.RetryConfig;
import com.salesforce.multicloudj.sts.model.CredentialsOverrider;
import com.salesforce.multicloudj.sts.model.CredentialsType;
import com.salesforce.multicloudj.sts.model.StsCredentials;
Expand All @@ -35,6 +36,7 @@
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.core.sync.ResponseTransformer;
Expand Down Expand Up @@ -96,6 +98,7 @@
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -107,6 +110,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
Expand All @@ -128,6 +132,17 @@ void setup() {
S3ClientBuilder mockBuilder = mock(S3ClientBuilder.class);
when(mockBuilder.region(any())).thenReturn(mockBuilder);

// Execute the consumer lambda to cover lines 540-549
doAnswer(invocation -> {
Consumer<ClientOverrideConfiguration.Builder> consumer = invocation.getArgument(0);
ClientOverrideConfiguration.Builder configBuilder = mock(ClientOverrideConfiguration.Builder.class);
when(configBuilder.retryStrategy(any(software.amazon.awssdk.retries.api.RetryStrategy.class))).thenReturn(configBuilder);
when(configBuilder.apiCallAttemptTimeout(any(Duration.class))).thenReturn(configBuilder);
when(configBuilder.apiCallTimeout(any(Duration.class))).thenReturn(configBuilder);
consumer.accept(configBuilder);
return mockBuilder;
}).when(mockBuilder).overrideConfiguration(any(Consumer.class));

s3Client = mockStatic(S3Client.class);
s3Client.when(S3Client::builder).thenReturn(mockBuilder);

Expand Down Expand Up @@ -950,4 +965,126 @@ private S3Object mockObject(int index) {
when(mockS3.size()).thenReturn((long) index);
return mockS3;
}

@Test
void testBuildS3ClientWithRetryConfig() {
// Test with exponential retry config
RetryConfig exponentialConfig = RetryConfig.builder()
.mode(RetryConfig.Mode.EXPONENTIAL)
.maxAttempts(3)
.initialDelayMillis(100L)
.multiplier(2.0)
.maxDelayMillis(5000L)
.attemptTimeout(5000L)
.totalTimeout(30000L)
.build();

var store = new AwsBlobStore.Builder()
.withTransformerSupplier(transformerSupplier)
.withBucket("bucket-1")
.withRegion("us-east-2")
.withRetryConfig(exponentialConfig)
.build();

assertNotNull(store);
assertEquals("bucket-1", store.getBucket());
}

@Test
void testBuildS3ClientWithFixedRetryConfig() {
// Test with fixed retry config
RetryConfig fixedConfig = RetryConfig.builder()
.mode(RetryConfig.Mode.FIXED)
.maxAttempts(5)
.fixedDelayMillis(1000L)
.build();

var store = new AwsBlobStore.Builder()
.withTransformerSupplier(transformerSupplier)
.withBucket("bucket-1")
.withRegion("us-east-2")
.withRetryConfig(fixedConfig)
.build();

assertNotNull(store);
assertEquals("bucket-1", store.getBucket());
}

@Test
void testBuildS3ClientWithRetryConfigWithNullMaxAttempts() {
// Test with null maxAttempts (should use AWS default)
RetryConfig config = RetryConfig.builder()
.mode(RetryConfig.Mode.EXPONENTIAL)
.maxAttempts(null)
.initialDelayMillis(100L)
.maxDelayMillis(5000L)
.build();

var store = new AwsBlobStore.Builder()
.withTransformerSupplier(transformerSupplier)
.withBucket("bucket-1")
.withRegion("us-east-2")
.withRetryConfig(config)
.build();

assertNotNull(store);
assertEquals("bucket-1", store.getBucket());
}

@Test
void testBuildS3ClientWithRetryConfigWithAttemptTimeout() {
// Test with attempt timeout only
RetryConfig config = RetryConfig.builder()
.mode(RetryConfig.Mode.EXPONENTIAL)
.maxAttempts(3)
.initialDelayMillis(100L)
.maxDelayMillis(5000L)
.attemptTimeout(5000L)
.build();

var store = new AwsBlobStore.Builder()
.withTransformerSupplier(transformerSupplier)
.withBucket("bucket-1")
.withRegion("us-east-2")
.withRetryConfig(config)
.build();

assertNotNull(store);
assertEquals("bucket-1", store.getBucket());
}

@Test
void testBuildS3ClientWithRetryConfigWithTotalTimeout() {
// Test with total timeout only
RetryConfig config = RetryConfig.builder()
.mode(RetryConfig.Mode.EXPONENTIAL)
.maxAttempts(3)
.initialDelayMillis(100L)
.maxDelayMillis(5000L)
.totalTimeout(30000L)
.build();

var store = new AwsBlobStore.Builder()
.withTransformerSupplier(transformerSupplier)
.withBucket("bucket-1")
.withRegion("us-east-2")
.withRetryConfig(config)
.build();

assertNotNull(store);
assertEquals("bucket-1", store.getBucket());
}

@Test
void testBuildS3ClientWithoutRetryConfig() {
// Test without retry config (default behavior)
var store = new AwsBlobStore.Builder()
.withTransformerSupplier(transformerSupplier)
.withBucket("bucket-1")
.withRegion("us-east-2")
.build();

assertNotNull(store);
assertEquals("bucket-1", store.getBucket());
}
}
Loading
Loading