From 3f6a2a73438c19043c42b1fc784e6895deac2385 Mon Sep 17 00:00:00 2001 From: Carl Camilleri Date: Sun, 19 May 2024 16:05:58 +0200 Subject: [PATCH 1/5] Introduce new binding class --- bin/bindings.properties | 1 + .../MultiPostgreNoSQLDBClient.java | 409 ++++++++++++++++++ 2 files changed, 410 insertions(+) create mode 100644 postgrenosql/src/main/java/site/ycsb/postgrenosql/MultiPostgreNoSQLDBClient.java diff --git a/bin/bindings.properties b/bin/bindings.properties index 5c767c7599..0dda4a82b9 100755 --- a/bin/bindings.properties +++ b/bin/bindings.properties @@ -62,6 +62,7 @@ mongodb-async:site.ycsb.db.AsyncMongoDbClient nosqldb:site.ycsb.db.NoSqlDbClient orientdb:site.ycsb.db.OrientDBClient postgrenosql:site.ycsb.postgrenosql.PostgreNoSQLDBClient +postgrenosql-multi:site.ycsb.postgrenosql.MultiPostgreNoSQLDBClient rados:site.ycsb.db.RadosClient redis:site.ycsb.db.RedisClient rest:site.ycsb.webservice.rest.RestClient diff --git a/postgrenosql/src/main/java/site/ycsb/postgrenosql/MultiPostgreNoSQLDBClient.java b/postgrenosql/src/main/java/site/ycsb/postgrenosql/MultiPostgreNoSQLDBClient.java new file mode 100644 index 0000000000..54a60e08cb --- /dev/null +++ b/postgrenosql/src/main/java/site/ycsb/postgrenosql/MultiPostgreNoSQLDBClient.java @@ -0,0 +1,409 @@ +/* + * Copyright 2017 YCSB Contributors. All Rights Reserved. + * + * CODE IS BASED ON the jdbc-binding JdbcDBClient class. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package site.ycsb.postgrenosql; + +import site.ycsb.*; +import org.json.simple.JSONObject; +import org.postgresql.Driver; +import org.postgresql.util.PGobject; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.*; + +/** + * PostgreNoSQL client for YCSB framework. + */ +public class MultiPostgreNoSQLDBClient extends DB { + private static final Logger LOG = LoggerFactory.getLogger(MultiPostgreNoSQLDBClient.class); + + /** Count the number of times initialized to teardown on the last. */ + private static final AtomicInteger INIT_COUNT = new AtomicInteger(0); + + /** Cache for already prepared statements. */ + private static ConcurrentMap cachedStatements; + + /** The driver to get the connection to postgresql. */ + private static Driver postgrenosqlDriver; + + /** The connection to the database. */ + private static Connection connection; + + /** The class to use as the jdbc driver. */ + public static final String DRIVER_CLASS = "db.driver"; + + /** The URL to connect to the database. */ + public static final String CONNECTION_URL = "postgrenosql.url"; + + /** The user name to use to connect to the database. */ + public static final String CONNECTION_USER = "postgrenosql.user"; + + /** The password to use for establishing the connection. */ + public static final String CONNECTION_PASSWD = "postgrenosql.passwd"; + + /** The JDBC connection auto-commit property for the driver. */ + public static final String JDBC_AUTO_COMMIT = "postgrenosql.autocommit"; + + /** The primary key in the user table. */ + public static final String PRIMARY_KEY = "YCSB_KEY"; + + /** The field name prefix in the table. */ + public static final String COLUMN_NAME = "YCSB_VALUE"; + + private static final String DEFAULT_PROP = ""; + + /** Returns parsed boolean value from the properties if set, otherwise returns defaultVal. */ + private static boolean getBoolProperty(Properties props, String key, boolean defaultVal) { + String valueStr = props.getProperty(key); + if (valueStr != null) { + return Boolean.parseBoolean(valueStr); + } + return defaultVal; + } + + @Override + public void init() throws DBException { + INIT_COUNT.incrementAndGet(); + synchronized (MultiPostgreNoSQLDBClient.class) { + if (postgrenosqlDriver != null) { + return; + } + + Properties props = getProperties(); + String urls = props.getProperty(CONNECTION_URL, DEFAULT_PROP); + String user = props.getProperty(CONNECTION_USER, DEFAULT_PROP); + String passwd = props.getProperty(CONNECTION_PASSWD, DEFAULT_PROP); + boolean autoCommit = getBoolProperty(props, JDBC_AUTO_COMMIT, true); + + try { + Properties tmpProps = new Properties(); + tmpProps.setProperty("user", user); + tmpProps.setProperty("password", passwd); + + cachedStatements = new ConcurrentHashMap<>(); + + postgrenosqlDriver = new Driver(); + connection = postgrenosqlDriver.connect(urls, tmpProps); + connection.setAutoCommit(autoCommit); + + } catch (Exception e) { + LOG.error("Error during initialization: " + e); + } + } + } + + @Override + public void cleanup() throws DBException { + if (INIT_COUNT.decrementAndGet() == 0) { + try { + cachedStatements.clear(); + + if (!connection.getAutoCommit()){ + connection.commit(); + } + connection.close(); + } catch (SQLException e) { + System.err.println("Error in cleanup execution. " + e); + } + postgrenosqlDriver = null; + } + } + + @Override + public Status read(String tableName, String key, Set fields, Map result) { + try { + StatementType type = new StatementType(StatementType.Type.READ, tableName, fields); + PreparedStatement readStatement = cachedStatements.get(type); + if (readStatement == null) { + readStatement = createAndCacheReadStatement(type); + } + readStatement.setString(1, key); + ResultSet resultSet = readStatement.executeQuery(); + if (!resultSet.next()) { + resultSet.close(); + return Status.NOT_FOUND; + } + + if (result != null) { + if (fields == null){ + do{ + String field = resultSet.getString(2); + String value = resultSet.getString(3); + result.put(field, new StringByteIterator(value)); + }while (resultSet.next()); + } else { + for (String field : fields) { + String value = resultSet.getString(field); + result.put(field, new StringByteIterator(value)); + } + } + } + resultSet.close(); + return Status.OK; + + } catch (SQLException e) { + LOG.error("Error in processing read of table " + tableName + ": " + e); + return Status.ERROR; + } + } + + @Override + public Status scan(String tableName, String startKey, int recordcount, Set fields, + Vector> result) { + try { + StatementType type = new StatementType(StatementType.Type.SCAN, tableName, fields); + PreparedStatement scanStatement = cachedStatements.get(type); + if (scanStatement == null) { + scanStatement = createAndCacheScanStatement(type); + } + scanStatement.setString(1, startKey); + scanStatement.setInt(2, recordcount); + ResultSet resultSet = scanStatement.executeQuery(); + for (int i = 0; i < recordcount && resultSet.next(); i++) { + if (result != null && fields != null) { + HashMap values = new HashMap(); + for (String field : fields) { + String value = resultSet.getString(field); + values.put(field, new StringByteIterator(value)); + } + + result.add(values); + } + } + + resultSet.close(); + return Status.OK; + } catch (SQLException e) { + LOG.error("Error in processing scan of table: " + tableName + ": " + e); + return Status.ERROR; + } + } + + @Override + public Status update(String tableName, String key, Map values) { + try{ + StatementType type = new StatementType(StatementType.Type.UPDATE, tableName, null); + PreparedStatement updateStatement = cachedStatements.get(type); + if (updateStatement == null) { + updateStatement = createAndCacheUpdateStatement(type); + } + + JSONObject jsonObject = new JSONObject(); + for (Map.Entry entry : values.entrySet()) { + jsonObject.put(entry.getKey(), entry.getValue().toString()); + } + + PGobject object = new PGobject(); + object.setType("jsonb"); + object.setValue(jsonObject.toJSONString()); + + updateStatement.setObject(1, object); + updateStatement.setString(2, key); + + int result = updateStatement.executeUpdate(); + if (result == 1) { + return Status.OK; + } + return Status.UNEXPECTED_STATE; + } catch (SQLException e) { + LOG.error("Error in processing update to table: " + tableName + e); + return Status.ERROR; + } + } + + @Override + public Status insert(String tableName, String key, Map values) { + try{ + StatementType type = new StatementType(StatementType.Type.INSERT, tableName, null); + PreparedStatement insertStatement = cachedStatements.get(type); + if (insertStatement == null) { + insertStatement = createAndCacheInsertStatement(type); + } + + JSONObject jsonObject = new JSONObject(); + for (Map.Entry entry : values.entrySet()) { + jsonObject.put(entry.getKey(), entry.getValue().toString()); + } + + PGobject object = new PGobject(); + object.setType("jsonb"); + object.setValue(jsonObject.toJSONString()); + + insertStatement.setObject(2, object); + insertStatement.setString(1, key); + + int result = insertStatement.executeUpdate(); + if (result == 1) { + return Status.OK; + } + + return Status.UNEXPECTED_STATE; + } catch (SQLException e) { + LOG.error("Error in processing insert to table: " + tableName + ": " + e); + return Status.ERROR; + } + } + + @Override + public Status delete(String tableName, String key) { + try{ + StatementType type = new StatementType(StatementType.Type.DELETE, tableName, null); + PreparedStatement deleteStatement = cachedStatements.get(type); + if (deleteStatement == null) { + deleteStatement = createAndCacheDeleteStatement(type); + } + deleteStatement.setString(1, key); + + int result = deleteStatement.executeUpdate(); + if (result == 1){ + return Status.OK; + } + + return Status.UNEXPECTED_STATE; + } catch (SQLException e) { + LOG.error("Error in processing delete to table: " + tableName + e); + return Status.ERROR; + } + } + + private PreparedStatement createAndCacheReadStatement(StatementType readType) + throws SQLException{ + PreparedStatement readStatement = connection.prepareStatement(createReadStatement(readType)); + PreparedStatement statement = cachedStatements.putIfAbsent(readType, readStatement); + if (statement == null) { + return readStatement; + } + return statement; + } + + private String createReadStatement(StatementType readType){ + StringBuilder read = new StringBuilder("SELECT " + PRIMARY_KEY + " AS " + PRIMARY_KEY); + + if (readType.getFields() == null) { + read.append(", (jsonb_each_text(" + COLUMN_NAME + ")).*"); + } else { + for (String field:readType.getFields()){ + read.append(", " + COLUMN_NAME + "->>'" + field + "' AS " + field); + } + } + + read.append(" FROM " + readType.getTableName()); + read.append(" WHERE "); + read.append(PRIMARY_KEY); + read.append(" = "); + read.append("?"); + return read.toString(); + } + + private PreparedStatement createAndCacheScanStatement(StatementType scanType) + throws SQLException{ + PreparedStatement scanStatement = connection.prepareStatement(createScanStatement(scanType)); + PreparedStatement statement = cachedStatements.putIfAbsent(scanType, scanStatement); + if (statement == null) { + return scanStatement; + } + return statement; + } + + private String createScanStatement(StatementType scanType){ + StringBuilder scan = new StringBuilder("SELECT " + PRIMARY_KEY + " AS " + PRIMARY_KEY); + if (scanType.getFields() != null){ + for (String field:scanType.getFields()){ + scan.append(", " + COLUMN_NAME + "->>'" + field + "' AS " + field); + } + } + scan.append(" FROM " + scanType.getTableName()); + scan.append(" WHERE "); + scan.append(PRIMARY_KEY); + scan.append(" >= ?"); + scan.append(" ORDER BY "); + scan.append(PRIMARY_KEY); + scan.append(" LIMIT ?"); + + return scan.toString(); + } + + public PreparedStatement createAndCacheUpdateStatement(StatementType updateType) + throws SQLException{ + PreparedStatement updateStatement = connection.prepareStatement(createUpdateStatement(updateType)); + PreparedStatement statement = cachedStatements.putIfAbsent(updateType, updateStatement); + if (statement == null) { + return updateStatement; + } + return statement; + } + + private String createUpdateStatement(StatementType updateType){ + StringBuilder update = new StringBuilder("UPDATE "); + update.append(updateType.getTableName()); + update.append(" SET "); + update.append(COLUMN_NAME + " = " + COLUMN_NAME); + update.append(" || ? "); + update.append(" WHERE "); + update.append(PRIMARY_KEY); + update.append(" = ?"); + return update.toString(); + } + + private PreparedStatement createAndCacheInsertStatement(StatementType insertType) + throws SQLException{ + PreparedStatement insertStatement = connection.prepareStatement(createInsertStatement(insertType)); + PreparedStatement statement = cachedStatements.putIfAbsent(insertType, insertStatement); + if (statement == null) { + return insertStatement; + } + return statement; + } + + private String createInsertStatement(StatementType insertType){ + StringBuilder insert = new StringBuilder("INSERT INTO "); + insert.append(insertType.getTableName()); + insert.append(" (" + PRIMARY_KEY + "," + COLUMN_NAME + ")"); + insert.append(" VALUES(?,?)"); + return insert.toString(); + } + + private PreparedStatement createAndCacheDeleteStatement(StatementType deleteType) + throws SQLException{ + PreparedStatement deleteStatement = connection.prepareStatement(createDeleteStatement(deleteType)); + PreparedStatement statement = cachedStatements.putIfAbsent(deleteType, deleteStatement); + if (statement == null) { + return deleteStatement; + } + return statement; + } + + private String createDeleteStatement(StatementType deleteType){ + StringBuilder delete = new StringBuilder("DELETE FROM "); + delete.append(deleteType.getTableName()); + delete.append(" WHERE "); + delete.append(PRIMARY_KEY); + delete.append(" = ?"); + return delete.toString(); + } +} From 373193237537a6d149d8d730a59480d2d1fd50f7 Mon Sep 17 00:00:00 2001 From: Carl Camilleri Date: Sun, 19 May 2024 16:06:12 +0200 Subject: [PATCH 2/5] Add HikariCP dependency --- postgrenosql/pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/postgrenosql/pom.xml b/postgrenosql/pom.xml index 069642f4c7..c994446d3d 100644 --- a/postgrenosql/pom.xml +++ b/postgrenosql/pom.xml @@ -61,6 +61,12 @@ LICENSE file. slf4j-simple 1.7.13 + + com.zaxxer + HikariCP + 4.0.3 + + From 484e5a3afb8b2ff60bff2cecd8edbed09723d98b Mon Sep 17 00:00:00 2001 From: Carl Camilleri Date: Sun, 19 May 2024 16:06:40 +0200 Subject: [PATCH 3/5] Remove the JDBC connection --- .../site/ycsb/postgrenosql/MultiPostgreNoSQLDBClient.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/postgrenosql/src/main/java/site/ycsb/postgrenosql/MultiPostgreNoSQLDBClient.java b/postgrenosql/src/main/java/site/ycsb/postgrenosql/MultiPostgreNoSQLDBClient.java index 54a60e08cb..7f9782b68c 100644 --- a/postgrenosql/src/main/java/site/ycsb/postgrenosql/MultiPostgreNoSQLDBClient.java +++ b/postgrenosql/src/main/java/site/ycsb/postgrenosql/MultiPostgreNoSQLDBClient.java @@ -48,12 +48,6 @@ public class MultiPostgreNoSQLDBClient extends DB { /** Cache for already prepared statements. */ private static ConcurrentMap cachedStatements; - /** The driver to get the connection to postgresql. */ - private static Driver postgrenosqlDriver; - - /** The connection to the database. */ - private static Connection connection; - /** The class to use as the jdbc driver. */ public static final String DRIVER_CLASS = "db.driver"; From 5402c85245be13adcac0849db1d3fbde22cb1e4b Mon Sep 17 00:00:00 2001 From: Carl Camilleri Date: Sun, 19 May 2024 16:07:07 +0200 Subject: [PATCH 4/5] Remove the statement cache - not compatible with connection pooling --- .../java/site/ycsb/postgrenosql/MultiPostgreNoSQLDBClient.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/postgrenosql/src/main/java/site/ycsb/postgrenosql/MultiPostgreNoSQLDBClient.java b/postgrenosql/src/main/java/site/ycsb/postgrenosql/MultiPostgreNoSQLDBClient.java index 7f9782b68c..20cb6b10d6 100644 --- a/postgrenosql/src/main/java/site/ycsb/postgrenosql/MultiPostgreNoSQLDBClient.java +++ b/postgrenosql/src/main/java/site/ycsb/postgrenosql/MultiPostgreNoSQLDBClient.java @@ -45,9 +45,6 @@ public class MultiPostgreNoSQLDBClient extends DB { /** Count the number of times initialized to teardown on the last. */ private static final AtomicInteger INIT_COUNT = new AtomicInteger(0); - /** Cache for already prepared statements. */ - private static ConcurrentMap cachedStatements; - /** The class to use as the jdbc driver. */ public static final String DRIVER_CLASS = "db.driver"; From 32a2db6bf5f9352f8170626fad08bb2085d04928 Mon Sep 17 00:00:00 2001 From: Carl Camilleri Date: Sun, 19 May 2024 16:31:05 +0200 Subject: [PATCH 5/5] Binding with connection pooling --- .../MultiPostgreNoSQLDBClient.java | 280 ++++++++++-------- 1 file changed, 157 insertions(+), 123 deletions(-) diff --git a/postgrenosql/src/main/java/site/ycsb/postgrenosql/MultiPostgreNoSQLDBClient.java b/postgrenosql/src/main/java/site/ycsb/postgrenosql/MultiPostgreNoSQLDBClient.java index 20cb6b10d6..8667d1fd00 100644 --- a/postgrenosql/src/main/java/site/ycsb/postgrenosql/MultiPostgreNoSQLDBClient.java +++ b/postgrenosql/src/main/java/site/ycsb/postgrenosql/MultiPostgreNoSQLDBClient.java @@ -18,9 +18,10 @@ */ package site.ycsb.postgrenosql; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; import site.ycsb.*; import org.json.simple.JSONObject; -import org.postgresql.Driver; import org.postgresql.util.PGobject; import org.slf4j.Logger; @@ -45,8 +46,12 @@ public class MultiPostgreNoSQLDBClient extends DB { /** Count the number of times initialized to teardown on the last. */ private static final AtomicInteger INIT_COUNT = new AtomicInteger(0); - /** The class to use as the jdbc driver. */ - public static final String DRIVER_CLASS = "db.driver"; + /** The connection pool **/ + private static final HikariConfig hikariConfig = new HikariConfig(); + private static HikariDataSource hikariDataSource; + + /** Cache for already generated SQL statements. */ + private static final ConcurrentMap cachedSQLStatements = new ConcurrentHashMap<>(); /** The URL to connect to the database. */ public static final String CONNECTION_URL = "postgrenosql.url"; @@ -60,6 +65,10 @@ public class MultiPostgreNoSQLDBClient extends DB { /** The JDBC connection auto-commit property for the driver. */ public static final String JDBC_AUTO_COMMIT = "postgrenosql.autocommit"; + public static final String CONNECTION_POOL_SIZE = "postgrenosql.connection_pool_size"; + public static final String CONNECTION_TIMEOUT = "postgrenosql.connection_timeout"; + public static final String CONNECTION_IDLE_TIMEOUT = "postgrenosql.connection_idle_timeout"; + /** The primary key in the user table. */ public static final String PRIMARY_KEY = "YCSB_KEY"; @@ -77,11 +86,16 @@ private static boolean getBoolProperty(Properties props, String key, boolean def return defaultVal; } + static int parseIntegerProperty(final Properties properties, final String key, final int defaultValue) { + final String value = properties.getProperty(key); + return value == null ? defaultValue : Integer.parseInt(value); + } + @Override - public void init() throws DBException { + public void init() { INIT_COUNT.incrementAndGet(); synchronized (MultiPostgreNoSQLDBClient.class) { - if (postgrenosqlDriver != null) { + if (hikariDataSource != null) { return; } @@ -89,18 +103,29 @@ public void init() throws DBException { String urls = props.getProperty(CONNECTION_URL, DEFAULT_PROP); String user = props.getProperty(CONNECTION_USER, DEFAULT_PROP); String passwd = props.getProperty(CONNECTION_PASSWD, DEFAULT_PROP); + int connectionPoolSize = parseIntegerProperty(props,CONNECTION_POOL_SIZE, 0); + int connectionTimeout = parseIntegerProperty(props,CONNECTION_TIMEOUT, 0); + int connectionIdleTimeout = parseIntegerProperty(props,CONNECTION_IDLE_TIMEOUT, 0); boolean autoCommit = getBoolProperty(props, JDBC_AUTO_COMMIT, true); try { - Properties tmpProps = new Properties(); - tmpProps.setProperty("user", user); - tmpProps.setProperty("password", passwd); - cachedStatements = new ConcurrentHashMap<>(); + hikariConfig.setJdbcUrl(urls); + hikariConfig.setUsername(user); + hikariConfig.setPassword(passwd); + if(connectionPoolSize>0) { + hikariConfig.setMaximumPoolSize(connectionPoolSize); + } + + if(connectionIdleTimeout>0) { + hikariConfig.setIdleTimeout(10000); + } - postgrenosqlDriver = new Driver(); - connection = postgrenosqlDriver.connect(urls, tmpProps); - connection.setAutoCommit(autoCommit); + if(connectionTimeout>0) { + hikariConfig.setConnectionTimeout(connectionTimeout); + } + hikariConfig.setAutoCommit(autoCommit); + hikariDataSource = new HikariDataSource(hikariConfig); } catch (Exception e) { LOG.error("Error during initialization: " + e); @@ -109,30 +134,18 @@ public void init() throws DBException { } @Override - public void cleanup() throws DBException { + public void cleanup() { if (INIT_COUNT.decrementAndGet() == 0) { - try { - cachedStatements.clear(); - - if (!connection.getAutoCommit()){ - connection.commit(); - } - connection.close(); - } catch (SQLException e) { - System.err.println("Error in cleanup execution. " + e); - } - postgrenosqlDriver = null; + hikariDataSource.close(); } } @Override public Status read(String tableName, String key, Set fields, Map result) { + PreparedStatement readStatement=null; try { StatementType type = new StatementType(StatementType.Type.READ, tableName, fields); - PreparedStatement readStatement = cachedStatements.get(type); - if (readStatement == null) { - readStatement = createAndCacheReadStatement(type); - } + readStatement = createAndCacheReadStatement(type); readStatement.setString(1, key); ResultSet resultSet = readStatement.executeQuery(); if (!resultSet.next()) { @@ -160,24 +173,35 @@ public Status read(String tableName, String key, Set fields, Map fields, Vector> result) { + PreparedStatement scanStatement = null; try { StatementType type = new StatementType(StatementType.Type.SCAN, tableName, fields); - PreparedStatement scanStatement = cachedStatements.get(type); - if (scanStatement == null) { - scanStatement = createAndCacheScanStatement(type); - } + scanStatement = createAndCacheScanStatement(type); scanStatement.setString(1, startKey); scanStatement.setInt(2, recordcount); ResultSet resultSet = scanStatement.executeQuery(); for (int i = 0; i < recordcount && resultSet.next(); i++) { if (result != null && fields != null) { - HashMap values = new HashMap(); + HashMap values = new HashMap<>(); for (String field : fields) { String value = resultSet.getString(field); values.put(field, new StringByteIterator(value)); @@ -192,17 +216,19 @@ public Status scan(String tableName, String startKey, int recordcount, Set values) { + PreparedStatement updateStatement = null; try{ StatementType type = new StatementType(StatementType.Type.UPDATE, tableName, null); - PreparedStatement updateStatement = cachedStatements.get(type); - if (updateStatement == null) { - updateStatement = createAndCacheUpdateStatement(type); - } + updateStatement = createAndCacheUpdateStatement(type); JSONObject jsonObject = new JSONObject(); for (Map.Entry entry : values.entrySet()) { @@ -224,17 +250,17 @@ public Status update(String tableName, String key, Map val } catch (SQLException e) { LOG.error("Error in processing update to table: " + tableName + e); return Status.ERROR; + } finally { + disposePreparedStatement(updateStatement); } } @Override public Status insert(String tableName, String key, Map values) { + PreparedStatement insertStatement=null; try{ StatementType type = new StatementType(StatementType.Type.INSERT, tableName, null); - PreparedStatement insertStatement = cachedStatements.get(type); - if (insertStatement == null) { - insertStatement = createAndCacheInsertStatement(type); - } + insertStatement = createAndCacheInsertStatement(type); JSONObject jsonObject = new JSONObject(); for (Map.Entry entry : values.entrySet()) { @@ -257,17 +283,17 @@ public Status insert(String tableName, String key, Map val } catch (SQLException e) { LOG.error("Error in processing insert to table: " + tableName + ": " + e); return Status.ERROR; + } finally { + disposePreparedStatement(insertStatement); } } @Override public Status delete(String tableName, String key) { + PreparedStatement deleteStatement=null; try{ StatementType type = new StatementType(StatementType.Type.DELETE, tableName, null); - PreparedStatement deleteStatement = cachedStatements.get(type); - if (deleteStatement == null) { - deleteStatement = createAndCacheDeleteStatement(type); - } + deleteStatement = createAndCacheDeleteStatement(type); deleteStatement.setString(1, key); int result = deleteStatement.executeUpdate(); @@ -279,122 +305,130 @@ public Status delete(String tableName, String key) { } catch (SQLException e) { LOG.error("Error in processing delete to table: " + tableName + e); return Status.ERROR; + } finally { + disposePreparedStatement(deleteStatement); } } private PreparedStatement createAndCacheReadStatement(StatementType readType) throws SQLException{ - PreparedStatement readStatement = connection.prepareStatement(createReadStatement(readType)); - PreparedStatement statement = cachedStatements.putIfAbsent(readType, readStatement); - if (statement == null) { - return readStatement; - } - return statement; + Connection connection = hikariDataSource.getConnection(); + return connection.prepareStatement(createReadStatement(readType)); } - private String createReadStatement(StatementType readType){ - StringBuilder read = new StringBuilder("SELECT " + PRIMARY_KEY + " AS " + PRIMARY_KEY); + private String createReadStatement(StatementType readType) { + + String res = cachedSQLStatements.get(readType); + if (res == null) { + StringBuilder read = new StringBuilder("SELECT " + PRIMARY_KEY + " AS " + PRIMARY_KEY); - if (readType.getFields() == null) { - read.append(", (jsonb_each_text(" + COLUMN_NAME + ")).*"); - } else { - for (String field:readType.getFields()){ - read.append(", " + COLUMN_NAME + "->>'" + field + "' AS " + field); + if (readType.getFields() == null) { + read.append(", (jsonb_each_text(" + COLUMN_NAME + ")).*"); + } else { + for (String field : readType.getFields()) { + read.append(", " + COLUMN_NAME + "->>'" + field + "' AS " + field); + } } - } - read.append(" FROM " + readType.getTableName()); - read.append(" WHERE "); - read.append(PRIMARY_KEY); - read.append(" = "); - read.append("?"); - return read.toString(); + read.append(" FROM " + readType.getTableName()); + read.append(" WHERE "); + read.append(PRIMARY_KEY); + read.append(" = "); + read.append("?"); + cachedSQLStatements.putIfAbsent(readType, read.toString()); + res = cachedSQLStatements.get(readType); + } + return res; } private PreparedStatement createAndCacheScanStatement(StatementType scanType) throws SQLException{ - PreparedStatement scanStatement = connection.prepareStatement(createScanStatement(scanType)); - PreparedStatement statement = cachedStatements.putIfAbsent(scanType, scanStatement); - if (statement == null) { - return scanStatement; - } - return statement; + + Connection connection = hikariDataSource.getConnection(); + return connection.prepareStatement(createScanStatement(scanType)); } private String createScanStatement(StatementType scanType){ - StringBuilder scan = new StringBuilder("SELECT " + PRIMARY_KEY + " AS " + PRIMARY_KEY); - if (scanType.getFields() != null){ - for (String field:scanType.getFields()){ - scan.append(", " + COLUMN_NAME + "->>'" + field + "' AS " + field); + String res = cachedSQLStatements.get(scanType); + if (res == null) { + StringBuilder scan = new StringBuilder("SELECT " + PRIMARY_KEY + " AS " + PRIMARY_KEY); + if (scanType.getFields() != null) { + for (String field : scanType.getFields()) { + scan.append(", " + COLUMN_NAME + "->>'" + field + "' AS " + field); + } } + scan.append(" FROM " + scanType.getTableName()); + scan.append(" WHERE "); + scan.append(PRIMARY_KEY); + scan.append(" >= ?"); + scan.append(" ORDER BY "); + scan.append(PRIMARY_KEY); + scan.append(" LIMIT ?"); + cachedSQLStatements.putIfAbsent(scanType,scan.toString()); + res = cachedSQLStatements.get(scanType); } - scan.append(" FROM " + scanType.getTableName()); - scan.append(" WHERE "); - scan.append(PRIMARY_KEY); - scan.append(" >= ?"); - scan.append(" ORDER BY "); - scan.append(PRIMARY_KEY); - scan.append(" LIMIT ?"); - - return scan.toString(); + return res; } public PreparedStatement createAndCacheUpdateStatement(StatementType updateType) throws SQLException{ - PreparedStatement updateStatement = connection.prepareStatement(createUpdateStatement(updateType)); - PreparedStatement statement = cachedStatements.putIfAbsent(updateType, updateStatement); - if (statement == null) { - return updateStatement; - } - return statement; + Connection connection = hikariDataSource.getConnection(); + return connection.prepareStatement(createUpdateStatement(updateType)); } private String createUpdateStatement(StatementType updateType){ - StringBuilder update = new StringBuilder("UPDATE "); - update.append(updateType.getTableName()); - update.append(" SET "); - update.append(COLUMN_NAME + " = " + COLUMN_NAME); - update.append(" || ? "); - update.append(" WHERE "); - update.append(PRIMARY_KEY); - update.append(" = ?"); - return update.toString(); + String res = cachedSQLStatements.get(updateType); + if (res == null) { + StringBuilder update = new StringBuilder("UPDATE "); + update.append(updateType.getTableName()); + update.append(" SET "); + update.append(COLUMN_NAME + " = " + COLUMN_NAME); + update.append(" || ? "); + update.append(" WHERE "); + update.append(PRIMARY_KEY); + update.append(" = ?"); + cachedSQLStatements.putIfAbsent(updateType,update.toString()); + res = cachedSQLStatements.get(updateType); + } + return res; } private PreparedStatement createAndCacheInsertStatement(StatementType insertType) throws SQLException{ - PreparedStatement insertStatement = connection.prepareStatement(createInsertStatement(insertType)); - PreparedStatement statement = cachedStatements.putIfAbsent(insertType, insertStatement); - if (statement == null) { - return insertStatement; - } - return statement; + Connection connection = hikariDataSource.getConnection(); + return connection.prepareStatement(createInsertStatement(insertType)); } private String createInsertStatement(StatementType insertType){ - StringBuilder insert = new StringBuilder("INSERT INTO "); - insert.append(insertType.getTableName()); - insert.append(" (" + PRIMARY_KEY + "," + COLUMN_NAME + ")"); - insert.append(" VALUES(?,?)"); - return insert.toString(); + String res = cachedSQLStatements.get(insertType); + if (res == null) { + StringBuilder insert = new StringBuilder("INSERT INTO "); + insert.append(insertType.getTableName()); + insert.append(" (" + PRIMARY_KEY + "," + COLUMN_NAME + ")"); + insert.append(" VALUES(?,?)"); + cachedSQLStatements.putIfAbsent(insertType,insert.toString()); + res = cachedSQLStatements.get(insertType); + } + return res; } private PreparedStatement createAndCacheDeleteStatement(StatementType deleteType) throws SQLException{ - PreparedStatement deleteStatement = connection.prepareStatement(createDeleteStatement(deleteType)); - PreparedStatement statement = cachedStatements.putIfAbsent(deleteType, deleteStatement); - if (statement == null) { - return deleteStatement; - } - return statement; + Connection connection = hikariDataSource.getConnection(); + return connection.prepareStatement(createDeleteStatement(deleteType)); } - private String createDeleteStatement(StatementType deleteType){ - StringBuilder delete = new StringBuilder("DELETE FROM "); - delete.append(deleteType.getTableName()); - delete.append(" WHERE "); - delete.append(PRIMARY_KEY); - delete.append(" = ?"); - return delete.toString(); + private String createDeleteStatement(StatementType deleteType) { + String res = cachedSQLStatements.get(deleteType); + if (res == null) { + StringBuilder delete = new StringBuilder("DELETE FROM "); + delete.append(deleteType.getTableName()); + delete.append(" WHERE "); + delete.append(PRIMARY_KEY); + delete.append(" = ?"); + cachedSQLStatements.putIfAbsent(deleteType, delete.toString()); + res = cachedSQLStatements.get(deleteType); + } + return res; } }