Skip to content
This repository was archived by the owner on Jun 18, 2020. It is now read-only.

Commit d7100fd

Browse files
committed
'Version 1.0.1 of the DynamoDB Import Export Tool'
1 parent 623c333 commit d7100fd

File tree

11 files changed

+80
-55
lines changed

11 files changed

+80
-55
lines changed

README.md

Lines changed: 26 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,44 @@
11
# DynamoDB Import Export Tool
2+
The DynamoDB Import Export Tool is designed to perform parallel scans on the source table, store scan results in a queue, then consume the queue by writing the items asynchronously to a destination table.
3+
4+
## Requirements ##
5+
* Maven
6+
* JRE 1.7+
7+
* Pre-existing source and destination DynamoDB tables
28

39
## Running as an executable
410

5-
mvn exec:java -Dexec.mainClass="com.amazonaws.dynamodb.bootstrap.CommandLineInterface" \
11+
1. Build the library:
12+
13+
```
14+
mvn install
15+
```
16+
17+
2. This produces the target jar in the target/ directory, to start the replication process:
18+
19+
java -jar dynamodb-import-export-tool.jar
620

7-
-Dexec.args=" \
8-
9-
--destinationEndpoint dynamodb.us-east-1.amazonaws.com \
10-
11-
--destinationTable nameOfDestinationTable \
12-
13-
--sourceEndpoint dynamodb.us-west-1.amazonaws.com \
14-
15-
--sourceTable nameOfSourceTable \
16-
17-
--readThroughputRatio .5 \
18-
19-
--writeThroughputRatio .7 \
20-
21-
--maxWriteThreads 128 \
22-
23-
--totalSections 4 \
24-
25-
--section 3"
21+
--destinationEndpoint <destination_endpoint> // the DynamoDB endpoint where the destination table is located.
2622

27-
### Description of Arguments
23+
--destinationTable <destination_table> // the destination table to write to.
2824

29-
destinationEndpoint: The endpoint where the destination table is located
25+
--sourceEndpoint <source_endpoint> // the endpoint where the source table is located.
3026

31-
destinationTable: The destination table to write to
27+
--sourceTable <source_table>// the source table to read from.
3228

33-
sourceEndpoint: The endpoint where the source table is located
29+
--readThroughputRatio <ratio_in_decimal> // the ratio of read throughput to consume from the source table.
3430

35-
sourceTable: The source table to read from
31+
--writeThroughputRatio <ratio_in_decimal> // the ratio of write throughput to consume from the destination table.
3632

37-
readThroughputRatio: The ratio of read throughput to consume from the source table
33+
--maxWriteThreads <numWriteThreads> // (Optional, default=128 * Available_Processors) Maximum number of write threads to create.
3834

39-
writeThroughputRatio: The ratio of write throughput to consume from the destination table
35+
--totalSections <numSections> // (Optional, default=1) Total number of sections to split the bootstrap into. Each application will only scan and write one section.
4036

41-
maxWriteThreads: (Optional, default=128 * Available_Processors) Maximum number of write threads to create
37+
--section <sectionSequence> // (Optional, default=0) section to read and write. Only will scan this one section of all sections, [0...totalSections-1].
4238

43-
totalSections: (Optional, default=1) Total number of sections to split the bootstrap into. Each application will only scan and write one section
39+
--consistentScan <boolean> // (Optional, default=false) indicates whether consistent scan should be used when reading from the source table.
4440

45-
section: (Optional, default=0) section to read and write. Only will scan this one section of all sections, [0...totalSections-1]
46-
41+
> **NOTE**: To split the replication process across multiple machines, simply use the totalSections & section command line arguments, where each machine will run one section out of [0 ... totalSections-1].
4742
4843
## Using the API
4944

pom.xml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
22
<modelVersion>4.0.0</modelVersion>
33
<groupId>com.amazonaws</groupId>
4-
<version>1.0.0</version>
4+
<version>1.0.1</version>
55
<artifactId>dynamodb-import-export-tool</artifactId>
66
<packaging>jar</packaging>
77
<name>DynamoDB Import Export Tool</name>
88
<url>https://github.com/awslabs/dynamodb-import-export-tool</url>
9+
<description>Exports DynamoDB items via parallel scan into a blocking queue, then consumes the queue and import DynamoDB items into a replica table using asynchronous writes.</description>
10+
<scm>
11+
<url>https://github.com/awslabs/dynamodb-import-export-tool.git</url>
12+
</scm>
913
<properties>
1014
<aws.java.sdk.version>1.10.10</aws.java.sdk.version>
1115
<powermock.version>1.6.2</powermock.version>
@@ -110,7 +114,6 @@
110114
</configuration>
111115
<version>3.0</version>
112116
</plugin>
113-
114117
<plugin>
115118
<groupId>org.apache.maven.plugins</groupId>
116119
<artifactId>maven-gpg-plugin</artifactId>

src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineInterface.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,15 @@
2323
import org.apache.log4j.LogManager;
2424
import org.apache.log4j.Logger;
2525

26+
import com.amazonaws.ClientConfiguration;
2627
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
2728
import com.amazonaws.dynamodb.bootstrap.constants.BootstrapConstants;
2829
import com.amazonaws.dynamodb.bootstrap.exception.NullReadCapacityException;
2930
import com.amazonaws.dynamodb.bootstrap.exception.SectionOutOfRangeException;
3031
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
3132
import com.amazonaws.services.dynamodbv2.model.TableDescription;
3233
import com.beust.jcommander.JCommander;
34+
import com.beust.jcommander.ParameterException;
3335

3436
/**
3537
* The interface that parses the arguments, and begins to transfer data from one
@@ -53,8 +55,15 @@ public static void main(String[] args) {
5355
CommandLineArgs params = new CommandLineArgs();
5456
JCommander cmd = new JCommander(params);
5557

56-
// parse given arguments
57-
cmd.parse(args);
58+
try {
59+
// parse given arguments
60+
cmd.parse(args);
61+
} catch (ParameterException e) {
62+
LOGGER.error(e);
63+
JCommander.getConsole().println(e.getMessage());
64+
cmd.usage();
65+
System.exit(1);
66+
}
5867

5968
// show usage information if help flag exists
6069
if (params.getHelp()) {
@@ -70,10 +79,13 @@ public static void main(String[] args) {
7079
final int maxWriteThreads = params.getMaxWriteThreads();
7180
final boolean consistentScan = params.getConsistentScan();
7281

82+
final ClientConfiguration sourceConfig = new ClientConfiguration().withMaxConnections(BootstrapConstants.MAX_CONN_SIZE);
83+
final ClientConfiguration destinationConfig = new ClientConfiguration().withMaxConnections(BootstrapConstants.MAX_CONN_SIZE);
84+
7385
final AmazonDynamoDBClient sourceClient = new AmazonDynamoDBClient(
74-
new DefaultAWSCredentialsProviderChain());
86+
new DefaultAWSCredentialsProviderChain(), sourceConfig);
7587
final AmazonDynamoDBClient destinationClient = new AmazonDynamoDBClient(
76-
new DefaultAWSCredentialsProviderChain());
88+
new DefaultAWSCredentialsProviderChain(), destinationConfig);
7789
sourceClient.setEndpoint(sourceEndpoint);
7890
destinationClient.setEndpoint(destinationEndpoint);
7991

@@ -136,7 +148,7 @@ private static double calculateThroughput(
136148
/**
137149
* Returns the thread pool for the destination DynamoDB table.
138150
*/
139-
public static ExecutorService getDestinationThreadPool(int maxWriteThreads) {
151+
private static ExecutorService getDestinationThreadPool(int maxWriteThreads) {
140152
int corePoolSize = BootstrapConstants.DYNAMODB_CLIENT_EXECUTOR_CORE_POOL_SIZE;
141153
if (corePoolSize > maxWriteThreads) {
142154
corePoolSize = maxWriteThreads - 1;
@@ -166,4 +178,4 @@ private static ExecutorService getSourceThreadPool(int numSegments) {
166178
return exec;
167179
}
168180

169-
}
181+
}

src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBBootstrapWorker.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public DynamoDBBootstrapWorker(AmazonDynamoDBClient client,
6464
this.totalSections = totalSections;
6565
this.consistentScan = consistentScan;
6666

67-
threadPool = exec;
67+
super.threadPool = exec;
6868
}
6969

7070
/**
@@ -91,7 +91,7 @@ public DynamoDBBootstrapWorker(AmazonDynamoDBClient client,
9191
if (numProcessors > numThreads) {
9292
numThreads = numProcessors;
9393
}
94-
this.threadPool = Executors.newFixedThreadPool(numThreads);
94+
super.threadPool = Executors.newFixedThreadPool(numThreads);
9595
}
9696

9797
/**
@@ -105,6 +105,7 @@ public void pipe(final AbstractLogConsumer consumer)
105105

106106
final ScanRequest request = new ScanRequest().withTableName(tableName)
107107
.withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
108+
.withLimit(BootstrapConstants.SCAN_LIMIT)
108109
.withConsistentRead(consistentScan);
109110

110111
final ParallelScanExecutor scanService = scanner
@@ -151,4 +152,4 @@ public static int getNumberOfSegments(TableDescription description)
151152
Math.ceil(tableSizeInGigabytes) / 10));
152153
}
153154

154-
}
155+
}

src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ public DynamoDBConsumer(AmazonDynamoDBClient client, String tableName,
5050
this.client = client;
5151
this.tableName = tableName;
5252
this.rateLimiter = RateLimiter.create(rateLimit);
53-
threadPool = exec;
54-
this.exec = new ExecutorCompletionService<Void>(threadPool);
53+
super.threadPool = exec;
54+
super.exec = new ExecutorCompletionService<Void>(threadPool);
5555
}
5656

5757
/**

src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumerWorker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public List<ConsumedCapacity> runWithBackoff(BatchWriteItemRequest req) {
100100
}
101101
}
102102
}
103-
} while (unprocessedItems.get(tableName) != null);
103+
} while (unprocessedItems != null && unprocessedItems.get(tableName) != null);
104104
return consumedCapacities;
105105
} finally {
106106
if (interrupted) {

src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScan.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public ScanRequest copyScanRequest(ScanRequest request) {
7676
.withTotalSegments(request.getTotalSegments())
7777
.withSegment(request.getSegment())
7878
.withReturnConsumedCapacity(request.getReturnConsumedCapacity())
79+
.withLimit(request.getLimit())
7980
.withConsistentRead(request.getConsistentRead());
8081
}
81-
}
82+
}

src/main/java/com/amazonaws/dynamodb/bootstrap/ScanSegmentWorker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ public SegmentedScanResult call() {
6767
int itemSize = isConsistent ? BootstrapConstants.STRONGLY_CONSISTENT_READ_ITEM_SIZE
6868
: BootstrapConstants.EVENTUALLY_CONSISTENT_READ_ITEM_SIZE;
6969

70-
lastConsumedCapacity = (int) ((result.getScannedCount() / Math.max(1.0, result.getCount()))
71-
* (ItemSizeCalculator.calculateScanResultSizeInBytes(result) / itemSize));
70+
lastConsumedCapacity = (result.getScannedCount() / (int) Math.max(1.0, result.getCount()))
71+
* (ItemSizeCalculator.calculateScanResultSizeInBytes(result) / itemSize);
7272
}
7373

7474
if (result.getLastEvaluatedKey() != null

src/main/java/com/amazonaws/dynamodb/bootstrap/constants/BootstrapConstants.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public class BootstrapConstants {
3838
.getRuntime().availableProcessors() * 128;
3939

4040
/**
41-
* Seconds to wait for thread to terminate before retrying or exiting.
41+
* Core pool size of a default thread pool.
4242
*/
4343
public static final int DYNAMODB_CLIENT_EXECUTOR_CORE_POOL_SIZE = Runtime
4444
.getRuntime().availableProcessors() * 4;
@@ -83,7 +83,7 @@ public class BootstrapConstants {
8383
/**
8484
* The size of an empty document in a DynamoDB item.
8585
*/
86-
public static int LOGICAL_SIZE_OF_EMPTY_DOCUMENT = 3;
86+
public static final int LOGICAL_SIZE_OF_EMPTY_DOCUMENT = 3;
8787

8888
/**
8989
* Max number of bytes in a DynamoDB number attribute.
@@ -100,4 +100,13 @@ public class BootstrapConstants {
100100
*/
101101
public static final int EVENTUALLY_CONSISTENT_READ_ITEM_SIZE = 2 * STRONGLY_CONSISTENT_READ_ITEM_SIZE;
102102

103+
/**
104+
* Max scan result size
105+
*/
106+
public static final int SCAN_LIMIT = 1000;
107+
108+
/**
109+
* Max connection size limit
110+
*/
111+
public static final int MAX_CONN_SIZE = 5000;
103112
}

src/main/resources/log4j.properties

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
log4j.rootLogger=INFO, CONSOLE
2+
3+
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
4+
log4j.appender.CONSOLE.layout=org.apache.log4j.EnhancedPatternLayout
5+
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %40C - %m%n%throwable
6+
log4j.appender.CONSOLE.threshold=INFO
7+
log4j.appender.CONSOLE.filter.1=org.apache.log4j.varia.StringMatchFilter
8+
log4j.appender.CONSOLE.filter.1.StringToMatch=has no children
9+
log4j.appender.CONSOLE.filter.1.AcceptOnMatch=false

0 commit comments

Comments
 (0)