Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class KyuubiConnection implements SQLConnection, KyuubiLoggable {
private JdbcConnectionParams connParams;
private TTransport transport;
private TCLIService.Iface client;
private boolean isClosed = true;
private volatile boolean isClosed = true;
private SQLWarning warningChain = null;
private TSessionHandle sessHandle = null;
private final List<TProtocolVersion> supportedProtocols = new LinkedList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.google.common.annotations.VisibleForTesting;
import java.sql.*;
import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.kyuubi.jdbc.hive.adapter.SQLStatement;
import org.apache.kyuubi.jdbc.hive.cli.FetchType;
Expand All @@ -40,7 +42,10 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
public static final String DEFAULT_ARROW_TIMESTAMP_AS_STRING = "false";
private final KyuubiConnection connection;
private TCLIService.Iface client;
private TOperationHandle stmtHandle = null;
private volatile TOperationHandle stmtHandle = null;
// This lock must be acquired before modifying or judge stmt
// to ensure there are no concurrent accesses or race conditions.
private final Lock stmtHandleAccessLock = new ReentrantLock();
private final TSessionHandle sessHandle;
Map<String, String> sessConf = new HashMap<>();
private int fetchSize = DEFAULT_FETCH_SIZE;
Expand All @@ -67,10 +72,10 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
private SQLWarning warningChain = null;

/** Keep state so we can fail certain calls made after close(). */
private boolean isClosed = false;
private volatile boolean isClosed = false;

/** Keep state so we can fail certain calls made after cancel(). */
private boolean isCancelled = false;
private volatile boolean isCancelled = false;

/** Keep this state so we can know whether the query in this statement is closed. */
private boolean isQueryClosed = false;
Expand Down Expand Up @@ -124,23 +129,25 @@ public KyuubiStatement(

@Override
public void cancel() throws SQLException {
checkConnection("cancel");
if (isCancelled) {
return;
}

try {
stmtHandleAccessLock.lock();
checkConnection("cancel");
if (isCancelled) {
return;
}
if (stmtHandle != null) {
TCancelOperationReq cancelReq = new TCancelOperationReq(stmtHandle);
TCancelOperationResp cancelResp = client.CancelOperation(cancelReq);
Utils.verifySuccessWithInfo(cancelResp.getStatus());
}
isCancelled = true;
} catch (SQLException e) {
throw e;
} catch (Exception e) {
throw new KyuubiSQLException(e.toString(), "08S01", e);
} finally {
stmtHandleAccessLock.unlock();
}
isCancelled = true;
}

@Override
Expand Down Expand Up @@ -177,13 +184,18 @@ void closeClientOperation() throws SQLException {

@Override
public void close() throws SQLException {
if (isClosed) {
return;
try {
stmtHandleAccessLock.lock();
if (isClosed) {
return;
}
closeClientOperation();
client = null;
closeResultSet();
isClosed = true;
} finally {
stmtHandleAccessLock.unlock();
}
closeClientOperation();
client = null;
closeResultSet();
isClosed = true;
}

@Override
Expand Down Expand Up @@ -312,38 +324,43 @@ private void runAsyncOnServer(String sql) throws SQLException {
}

private void runAsyncOnServer(String sql, Map<String, String> confOneTime) throws SQLException {
checkConnection("execute");

reInitState();

TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, sql);
/**
* Run asynchronously whenever possible Currently only a SQLOperation can be run asynchronously,
* in a background operation thread Compilation can run asynchronously or synchronously and
* execution run asynchronously
*/
execReq.setRunAsync(true);
if (confOneTime != null) {
Map<String, String> confOverlay = new HashMap<String, String>(sessConf);
confOverlay.putAll(confOneTime);
execReq.setConfOverlay(confOverlay);
} else {
execReq.setConfOverlay(sessConf);
}
execReq.setQueryTimeout(queryTimeout);
stmtHandleAccessLock.lock();
try {
TExecuteStatementResp execResp = client.ExecuteStatement(execReq);
Utils.verifySuccessWithInfo(execResp.getStatus());
stmtHandle = execResp.getOperationHandle();
isExecuteStatementFailed = false;
} catch (SQLException eS) {
isExecuteStatementFailed = true;
isLogBeingGenerated = false;
throw eS;
} catch (Exception ex) {
isExecuteStatementFailed = true;
isLogBeingGenerated = false;
throw new KyuubiSQLException(ex.toString(), "08S01", ex);
checkConnection("execute");

reInitState();

TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, sql);
/**
* Run asynchronously whenever possible Currently only a SQLOperation can be run
* asynchronously, in a background operation thread Compilation can run asynchronously or
* synchronously and execution run asynchronously
*/
execReq.setRunAsync(true);
if (confOneTime != null) {
Map<String, String> confOverlay = new HashMap<>(sessConf);
confOverlay.putAll(confOneTime);
execReq.setConfOverlay(confOverlay);
} else {
execReq.setConfOverlay(sessConf);
}
execReq.setQueryTimeout(queryTimeout);
try {
TExecuteStatementResp execResp = client.ExecuteStatement(execReq);
Utils.verifySuccessWithInfo(execResp.getStatus());
stmtHandle = execResp.getOperationHandle();
isExecuteStatementFailed = false;
} catch (SQLException eS) {
isExecuteStatementFailed = true;
isLogBeingGenerated = false;
throw eS;
} catch (Exception ex) {
isExecuteStatementFailed = true;
isLogBeingGenerated = false;
throw new KyuubiSQLException(ex.toString(), "08S01", ex);
}
} finally {
stmtHandleAccessLock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package org.apache.kyuubi.jdbc.hive;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

import java.sql.SQLException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;

public class KyuubiStatementTest {
Expand Down Expand Up @@ -54,4 +57,47 @@ public void testaddBatch() throws SQLException {
stmt.addBatch(null);
}
}

@Test
public void testThrowKyuubiSQLExceptionWhenExecuteSqlOnClosedStmt()
throws SQLException, InterruptedException {
try (KyuubiStatement stmt = new KyuubiStatement(null, null, null)) {
AtomicReference<Throwable> assertionFailure = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);

Thread thread1 =
new Thread(
() -> {
try {
latch.countDown();
stmt.close();
} catch (SQLException e) {
assertionFailure.set(e);
}
});

Thread thread2 =
new Thread(
() -> {
try {
latch.await();
KyuubiSQLException ex =
assertThrows(KyuubiSQLException.class, () -> stmt.execute("SELECT 1"));
assertEquals("Can't execute after statement has been closed", ex.getMessage());
} catch (Throwable t) {
assertionFailure.set(t);
}
});

thread1.start();
thread2.start();

thread1.join();
thread2.join();

if (assertionFailure.get() != null) {
throw new AssertionError("Assertion failed in thread", assertionFailure.get());
}
}
}
}
Loading