Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .idea/codeStyleSettings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ dependencies {
compile 'com.univocity:univocity-parsers:2.1.1'
compile 'org.apache.commons:commons-lang3:3.0'
compile group: 'com.googlecode.json-simple', name: 'json-simple', version: '1.1.1'
compile group: 'me.xdrop', name: 'fuzzywuzzy', version: '1.1.5'
}

task uberloader(type: Jar) {
Expand Down
1 change: 1 addition & 0 deletions sample/fuzzy_load.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
../build/cassandra-loader -f fuzzy_titanic.csv -host localhost -keyspace titanic -table survivors -boolStyle 1_0 -skipRows 1
892 changes: 892 additions & 0 deletions sample/fuzzy_titanic.csv

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion sample/load.sh
Original file line number Diff line number Diff line change
@@ -1 +1 @@
../build/cassandra-loader -f titanic.csv -host localhost -schema "titanic.survivors(id, survived, passenger_class, name, sex, age, num_siblings_spouse, num_parents_children, ticket_id, fare, cabin, port_of_embarkation)" -delimInQuotes true -boolStyle 1_0
../build/cassandra-loader -f titanic.csv -host localhost -schema "titanic.survivors(id, survived, passenger_class, name, sex, age, num_siblings_spouse, num_parents_children, ticket_id, fare, cabin, port_of_embarkation)" -boolStyle 1_0
169 changes: 82 additions & 87 deletions src/main/java/com/datastax/loader/CqlDelimLoad.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,73 +15,54 @@
*/
package com.datastax.loader;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.JdkSSLOptions;
import com.datastax.driver.core.Metrics;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.SSLOptions;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.datastax.loader.parser.BooleanParser;
import com.datastax.loader.futures.FutureManager;
import com.datastax.loader.futures.PrintingFutureSet;

import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import java.util.HashSet;
import java.util.Deque;
import java.util.ArrayDeque;
import java.util.Comparator;
import java.util.Arrays;
import java.util.Locale;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.io.File;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.InputStreamReader;
import java.io.IOException;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.FileInputStream;
import java.io.BufferedOutputStream;
import java.io.PrintStream;
import java.io.FileNotFoundException;
import java.text.ParseException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.SecureRandom;
import java.security.KeyStoreException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.text.ParseException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Metrics;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.ProtocolOptions;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.SSLOptions;
import com.datastax.driver.core.JdkSSLOptions;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;

import com.codahale.metrics.Timer;
import org.apache.commons.lang3.StringEscapeUtils;

public class CqlDelimLoad {
private String version = "0.0.21";
private String host = null;
Expand Down Expand Up @@ -114,7 +95,6 @@ public class CqlDelimLoad {
private long maxErrors = 10;
private long skipRows = 0;
private String skipCols = null;

private long maxRows = -1;
private String badDir = ".";
private String filename = null;
Expand All @@ -133,6 +113,7 @@ public class CqlDelimLoad {
private int numThreads = Runtime.getRuntime().availableProcessors();
private int batchSize = 1;
private boolean nullsUnset = false;
private boolean fuzzyMatch = false;

private String usage() {
StringBuilder usage = new StringBuilder("version: ").append(version).append("\n");
Expand Down Expand Up @@ -183,16 +164,19 @@ private String usage() {
usage.append("cassandra-loader -f stdin -host localhost -schema \"test.test3(a, b, c)\" -user myuser -pw mypassword\n");
return usage.toString();
}

private boolean validateArgs() {
if (format.equalsIgnoreCase("delim")) {
if (null == cqlSchema) {
System.err.println("If you specify format " + format + " you must provide a schema");
if (null == cqlSchema && skipRows > 0) {
System.err.println("No schema was specified but there is a header, attempting to fuzzy match against header");
fuzzyMatch=true;
} else if (null == cqlSchema){
System.err.println("If you specify format " + format + " but do not have a header (skipRows) you must provide a schema");
return false;
}
if (null != keyspace)
else if (null != keyspace)
System.err.println("Format is " + format + ", ignoring keyspace");
if (null != table)
else if (null != table)
System.err.println("Format is " + format + ", ignoring table");
}
else if (format.equalsIgnoreCase("jsonline")
Expand Down Expand Up @@ -363,7 +347,7 @@ private boolean processConfigFile(String fname, Map<String, String> amap)
}
return true;
}

private boolean parseArgs(String[] args) throws IOException, FileNotFoundException {
String tkey;
if (args.length == 0) {
Expand Down Expand Up @@ -447,7 +431,7 @@ private boolean parseArgs(String[] args) throws IOException, FileNotFoundExcepti
maxErrors = Long.MAX_VALUE;
if (-1 == maxInsertErrors)
maxInsertErrors = Long.MAX_VALUE;

if (!amap.isEmpty()) {
for (String k : amap.keySet())
System.err.println("Unrecognized option: " + k);
Expand All @@ -472,7 +456,7 @@ private String quote(String instr) {
return "\"" + ret + "\"";
}

private SSLOptions createSSLOptions()
private SSLOptions createSSLOptions()
throws KeyStoreException, FileNotFoundException, IOException, NoSuchAlgorithmException,
KeyManagementException, CertificateException, UnrecoverableKeyException {
TrustManagerFactory tmf = null;
Expand All @@ -481,7 +465,7 @@ private SSLOptions createSSLOptions()
truststorePwd.toCharArray());
tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(tks);

KeyManagerFactory kmf = null;
if (null != keystorePath) {
KeyStore kks = KeyStore.getInstance("JKS");
Expand All @@ -503,27 +487,39 @@ private boolean setup()
throws IOException, KeyStoreException, NoSuchAlgorithmException, KeyManagementException,
CertificateException, UnrecoverableKeyException {
// Connect to Cassandra
PoolingOptions pOpts = new PoolingOptions();
pOpts.setMaxConnectionsPerHost(HostDistance.LOCAL, 8);
pOpts.setCoreConnectionsPerHost(HostDistance.LOCAL, 8);
Cluster.Builder clusterBuilder = Cluster.builder()
.addContactPoint(host)
.withPort(port)
//.withCompression(ProtocolOptions.Compression.LZ4)
.withPoolingOptions(pOpts)
.withLoadBalancingPolicy(new TokenAwarePolicy( DCAwareRoundRobinPolicy.builder().build()))
;

if (null != username)
clusterBuilder = clusterBuilder.withCredentials(username, password);
if (null != truststorePath)
clusterBuilder = clusterBuilder.withSSL(createSSLOptions());

cluster = clusterBuilder.build();
if (null == cluster) {
throw new IOException("Could not create cluster");
}
Session tsession = cluster.connect();
Session tsession = null;
try {
PoolingOptions pOpts = new PoolingOptions();
pOpts.setMaxConnectionsPerHost(HostDistance.LOCAL, 8);
pOpts.setCoreConnectionsPerHost(HostDistance.LOCAL, 8);
Cluster.Builder clusterBuilder = Cluster.builder()
.addContactPoint(host)
.withPort(port)
//.withCompression(ProtocolOptions.Compression.LZ4)
.withPoolingOptions(pOpts)
.withLoadBalancingPolicy(new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build()));

if (null != username)
clusterBuilder = clusterBuilder.withCredentials(username, password);
if (null != truststorePath)
clusterBuilder = clusterBuilder.withSSL(createSSLOptions());

cluster = clusterBuilder.build();
if (null == cluster) {
throw new IOException("Could not create cluster");
}
tsession = cluster.connect();
}
catch (IllegalArgumentException e){
System.err.println("Could not connect to the cluster, check your hosts");
//e.printStackTrace();
System.exit(0);
}
catch (Exception e){
System.err.println(e.getStackTrace());
e.printStackTrace();
System.exit(0);
}

if ((0 > cluster.getConfiguration().getProtocolOptions()
.getProtocolVersion().compareTo(ProtocolVersion.V4))
Expand Down Expand Up @@ -559,7 +555,6 @@ private void cleanup() {
if (null != cluster)
cluster.close();
}

public boolean run(String[] args)
throws IOException, ParseException, InterruptedException, ExecutionException, KeyStoreException,
NoSuchAlgorithmException, KeyManagementException, CertificateException,
Expand All @@ -573,7 +568,7 @@ public boolean run(String[] args)
// Setup
if (false == setup())
return false;

// open file
Deque<File> fileList = new ArrayDeque<File>();
File infile = null;
Expand Down Expand Up @@ -621,7 +616,7 @@ public int compare(File f1, File f2) {
maxInsertErrors,
successDir, failureDir,
nullsUnset, format,
keyspace, table);
keyspace, table, fuzzyMatch);
Future<Long> res = executor.submit(worker);
total = res.get();
executor.shutdown();
Expand All @@ -646,7 +641,7 @@ public int compare(File f1, File f2) {
maxInsertErrors,
successDir, failureDir,
nullsUnset, format,
keyspace, table);
keyspace, table, fuzzyMatch);
results.add(executor.submit(worker));
}
executor.shutdown();
Expand Down
30 changes: 21 additions & 9 deletions src/main/java/com/datastax/loader/CqlDelimLoadTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,9 @@
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.loader.futures.FutureManager;
import com.datastax.loader.futures.PrintingFutureSet;
import com.datastax.loader.futures.JsonPrintingFutureSet;
import com.datastax.loader.futures.PrintingFutureSet;
import com.datastax.loader.parser.BooleanParser;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;

import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.File;
Expand All @@ -46,6 +42,9 @@
import java.util.Locale;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;

class CqlDelimLoadTask implements Callable<Long> {
private String BADPARSE = ".BADPARSE";
Expand Down Expand Up @@ -94,6 +93,7 @@ class CqlDelimLoadTask implements Callable<Long> {
private String keyspace = null;
private String table = null;
private JSONArray jsonArray;
private boolean fuzzyMatch;

public CqlDelimLoadTask(String inCqlSchema, String inDelimiter,
int inCharsPerColumn,
Expand All @@ -108,7 +108,7 @@ public CqlDelimLoadTask(String inCqlSchema, String inDelimiter,
int inQueryTimeout, long inMaxInsertErrors,
String inSuccessDir, String inFailureDir,
boolean inNullsUnset, String inFormat,
String inKeyspace, String inTable) {
String inKeyspace, String inTable, boolean inFuzzyMatch) {
super();
cqlSchema = inCqlSchema;
delimiter = inDelimiter;
Expand Down Expand Up @@ -136,6 +136,7 @@ public CqlDelimLoadTask(String inCqlSchema, String inDelimiter,
format = inFormat;
keyspace = inKeyspace;
table = inTable;
fuzzyMatch = inFuzzyMatch;
}

public Long call() throws IOException, ParseException, org.json.simple.parser.ParseException {
Expand Down Expand Up @@ -168,18 +169,29 @@ private void setup() throws IOException, ParseException, org.json.simple.parser.
logPrinter = new PrintStream(new BufferedOutputStream(new FileOutputStream(logFname)));
}

if (format.equalsIgnoreCase("delim")) {


if (keyspace == null) {
cdp = new CqlDelimParser(cqlSchema, delimiter, charsPerColumn,
nullString,
dateFormatString, boolStyle, locale,
skipCols, session, true);
}
else if (format.equalsIgnoreCase("jsonline")
|| format.equalsIgnoreCase("jsonarray")) {
else{
cdp = new CqlDelimParser(keyspace, table, delimiter, charsPerColumn,
nullString,
dateFormatString, boolStyle, locale,
skipCols, session, true);
if(fuzzyMatch){
//TODO: is this a good read ahead limit?
reader.mark(20000);
String cqlSchema = new CqlSchemaFuzzyMatcher().match(keyspace, table, reader, delimiter, cdp);
reader.reset();
cdp = new CqlDelimParser(cqlSchema, delimiter, charsPerColumn,
nullString,
dateFormatString, boolStyle, locale,
skipCols, session, true);
}
}

insert = cdp.generateInsert();
Expand Down
Loading