Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I do not really get why do you changed the single StringBuilder query to StringBuilder fieldsBuilder, StringBuilder placeholdersBuilder and StringBuilder updatesBuilder with the overcomplicated query building? One single for loop appends different parts to different StringBuilders. The original version was already a PreparedStatement. I can't see any additional value replacing it. Finally it is the same SQL query generated with more resource is used.
Am I missing something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original intention of the issue was to replace direct String concat with parameter replacements and pre-build queries. The SQL strings could be prepared on init and just looked up (might open the was for a more sophisticated solution using a connection pool).

Side note: One would need to sanitize table names, too, because they cannot be set as ? In a prepared statement, so might be good to add a simple check.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Locale;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.storm.metric.api.MultiCountMetric;
Expand All @@ -35,33 +36,26 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Stores URL and selected metadata into a SQL table * */
/** Stores URL and selected metadata into a SQL table */
public class IndexerBolt extends AbstractIndexerBolt {

private static final Logger LOG = LoggerFactory.getLogger(IndexerBolt.class);

public static final String SQL_INDEX_TABLE_PARAM_NAME = "sql.index.table";

private OutputCollector _collector;

private MultiCountMetric eventCounter;

private Connection connection;

private String tableName;

private Map<String, Object> conf;

@Override
public void prepare(
Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
super.prepare(conf, context, collector);
_collector = collector;

this.eventCounter = context.registerMetric("SQLIndexer", new MultiCountMetric(), 10);

this.tableName = ConfUtils.getString(conf, SQL_INDEX_TABLE_PARAM_NAME);

this.conf = conf;
}

Expand All @@ -87,39 +81,32 @@ public void execute(Tuple tuple) {
}

try {

// which metadata to display?
Map<String, String[]> keyVals = filterMetadata(metadata);

StringBuilder query =
new StringBuilder(" insert into ")
.append(tableName)
.append(" (")
.append(fieldNameForURL());

Object[] keys = keyVals.keySet().toArray();

for (Object o : keys) {
query.append(", ").append((String) o);
}
// Build SQL statement with prepared statement
StringBuilder fieldsBuilder = new StringBuilder(fieldNameForURL());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @sigee here. We shouldn't create a lot of StringBuilders here and instead use parameter replacement were needed. In addition, I would add a simple (regex) check for the table name to avoid anything unexpected.

StringBuilder placeholdersBuilder = new StringBuilder("?");
StringBuilder updatesBuilder = new StringBuilder();

query.append(") values(?");

for (int i = 0; i < keys.length; i++) {
query.append(", ?");
}

query.append(")");

query.append(" ON DUPLICATE KEY UPDATE ");
for (int i = 0; i < keys.length; i++) {
String key = (String) keys[i];
if (i > 0) {
query.append(", ");
}
query.append(key).append("=VALUES(").append(key).append(")");
fieldsBuilder.append(", ").append(key);
placeholdersBuilder.append(", ?");
if (i > 0) updatesBuilder.append(", ");
updatesBuilder.append(key).append("=VALUES(").append(key).append(")");
}

String sql =
String.format(
Locale.ROOT,
"INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s",
tableName,
fieldsBuilder,
placeholdersBuilder,
updatesBuilder);

if (connection == null) {
try {
connection = SQLUtil.getConnection(conf);
Expand All @@ -129,29 +116,30 @@ public void execute(Tuple tuple) {
}
}

LOG.debug("PreparedStatement => {}", query);
LOG.debug("PreparedStatement => {}", sql);

// create the mysql insert preparedstatement
PreparedStatement preparedStmt = connection.prepareStatement(query.toString());
// Create the MySQL insert PreparedStatement
PreparedStatement preparedStmt = connection.prepareStatement(sql);

// TODO store the text of the document?
if (StringUtils.isNotBlank(fieldNameForText())) {
// builder.field(fieldNameForText(), trimText(text));
}

// send URL as field?
// Send URL as first parameter
if (fieldNameForURL() != null) {
preparedStmt.setString(1, normalisedurl);
}

// Send metadata values
for (int i = 0; i < keys.length; i++) {
insert(preparedStmt, i + 2, (String) keys[i], keyVals);
}

preparedStmt.executeUpdate();
preparedStmt.close();

eventCounter.scope("Indexed").incrBy(1);

_collector.emit(StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED));
_collector.ack(tuple);

Expand All @@ -164,6 +152,7 @@ public void execute(Tuple tuple) {
try {
connection.close();
} catch (SQLException e1) {
// ignore
}
connection = null;
}
Expand All @@ -180,11 +169,11 @@ private void insert(
String value = "";
if (values == null || values.length == 0) {
LOG.info("No values found for label {}", label);
} else if (values.length > 1) {
LOG.info("More than one value found for label {}", label);
value = values[0];
} else {
value = values[0];
if (values.length > 1) {
LOG.info("More than one value found for label {}", label);
}
}
preparedStmt.setString(position, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
Expand All @@ -48,12 +48,12 @@ public class SQLSpout extends AbstractQueryingSpout {
private Connection connection;

/**
* if more than one instance of the spout exist, each one is in charge of a separate bucket
* If more than one instance of the spout exist, each one is in charge of a separate bucket
* value. This is used to ensure a good diversity of URLs.
*/
private int bucketNum = -1;

/** Used to distinguish between instances in the logs * */
/** Used to distinguish between instances in the logs */
protected String logIdprefix = "";

private int maxDocsPerBucket;
Expand All @@ -69,9 +69,7 @@ public void open(
super.open(conf, context, collector);

maxDocsPerBucket = ConfUtils.getInt(conf, Constants.SQL_MAX_DOCS_BUCKET_PARAM_NAME, 5);

tableName = ConfUtils.getString(conf, Constants.SQL_STATUS_TABLE_PARAM_NAME, "urls");

maxNumResults = ConfUtils.getInt(conf, Constants.SQL_MAXRESULTS_PARAM_NAME, 100);

try {
Expand All @@ -81,7 +79,7 @@ public void open(
throw new RuntimeException(ex);
}

// determine bucket this spout instance will be in charge of
// Determine bucket this spout instance will be in charge of
int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
if (totalTasks > 1) {
logIdprefix =
Expand Down Expand Up @@ -113,73 +111,80 @@ protected void populateBuffer() {
}
}

// select entries from mysql
// https://mariadb.com/kb/en/library/window-functions-overview/
// http://www.mysqltutorial.org/mysql-window-functions/mysql-rank-function/

String query =
"SELECT * from (select rank() over (partition by host order by nextfetchdate desc, url) as ranking, url, metadata, nextfetchdate from "
+ tableName;
int alreadyprocessed = 0;
int numhits = 0;
long timeStartQuery = System.currentTimeMillis();

query +=
" WHERE nextfetchdate <= '" + new Timestamp(lastNextFetchDate.toEpochMilli()) + "'";
PreparedStatement pstmt = null;
ResultSet rs = null;

// constraint on bucket num
if (bucketNum >= 0) {
query += " AND bucket = '" + bucketNum + "'";
}
try {
// Select entries from MySQL
// https://mariadb.com/kb/en/library/window-functions-overview/
// http://www.mysqltutorial.org/mysql-window-functions/mysql-rank-function/
StringBuilder queryBuilder = new StringBuilder();
queryBuilder.append("SELECT * FROM (");
queryBuilder.append(
"SELECT RANK() OVER (PARTITION BY host ORDER BY nextfetchdate DESC, url) AS ranking, ");
queryBuilder.append("url, metadata, nextfetchdate FROM ").append(tableName);
queryBuilder.append(" WHERE nextfetchdate <= ?");
if (bucketNum >= 0) {
queryBuilder.append(" AND bucket = ?");
}
queryBuilder.append(") AS urls_ranks WHERE urls_ranks.ranking <= ? ");
if (maxNumResults != -1) {
queryBuilder.append("ORDER BY ranking LIMIT ?");
} else {
queryBuilder.append("ORDER BY ranking");
}

query +=
") as urls_ranks where (urls_ranks.ranking <= "
+ maxDocsPerBucket
+ ") order by ranking";
String query = queryBuilder.toString();
LOG.debug("{} SQL query: {}", logIdprefix, query);

if (maxNumResults != -1) {
query += " LIMIT " + this.maxNumResults;
}
pstmt = connection.prepareStatement(query);

int alreadyprocessed = 0;
int numhits = 0;
int paramIndex = 1;
pstmt.setTimestamp(paramIndex++, new Timestamp(lastNextFetchDate.toEpochMilli()));

long timeStartQuery = System.currentTimeMillis();
if (bucketNum >= 0) {
pstmt.setInt(paramIndex++, bucketNum);
}

// create the java statement
Statement st = null;
ResultSet rs = null;
try {
st = this.connection.createStatement();
pstmt.setInt(paramIndex++, maxDocsPerBucket);

// dump query to log
LOG.debug("{} SQL query {}", logIdprefix, query);
if (maxNumResults != -1) {
pstmt.setInt(paramIndex++, maxNumResults);
}

// execute the query, and get a java resultset
rs = st.executeQuery(query);
rs = pstmt.executeQuery();

long timeTaken = System.currentTimeMillis() - timeStartQuery;
queryTimes.addMeasurement(timeTaken);

// iterate through the java resultset
while (rs.next()) {
String url = rs.getString("url");
numhits++;
// already processed? skip

// Already processed? Skip
if (beingProcessed.containsKey(url)) {
alreadyprocessed++;
continue;
}

String metadata = rs.getString("metadata");
if (metadata == null) {
metadata = "";
} else if (!metadata.startsWith("\t")) {
metadata = "\t" + metadata;
}

String URLMD = url + metadata;
List<Object> v =
SCHEME.deserialize(ByteBuffer.wrap(URLMD.getBytes(StandardCharsets.UTF_8)));
buffer.add(url, (Metadata) v.get(1));
}

// no results? reset the date
// No results? Reset the date
if (numhits == 0) {
lastNextFetchDate = null;
}
Expand All @@ -204,9 +209,9 @@ protected void populateBuffer() {
LOG.error("Exception closing resultset", e);
}
try {
if (st != null) st.close();
if (pstmt != null) pstmt.close();
} catch (SQLException e) {
LOG.error("Exception closing statement", e);
LOG.error("Exception closing prepared statement", e);
}
}
}
Expand Down