From e6509c71ac40f87a302fa011318645e09279a5d8 Mon Sep 17 00:00:00 2001 From: ruanwenjun Date: Thu, 10 Jul 2025 19:58:55 +0800 Subject: [PATCH 1/2] Use stmtHandleChangeLock in KyuubiStatement to ensure thread safe access and updates to stmtHandle. --- .../kyuubi/jdbc/hive/KyuubiConnection.java | 2 +- .../kyuubi/jdbc/hive/KyuubiStatement.java | 109 ++++++++++-------- .../kyuubi/jdbc/hive/KyuubiStatementTest.java | 29 +++++ 3 files changed, 93 insertions(+), 47 deletions(-) diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java index dc2b7afee3d..5fda3006c80 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java @@ -88,7 +88,7 @@ public class KyuubiConnection implements SQLConnection, KyuubiLoggable { private JdbcConnectionParams connParams; private TTransport transport; private TCLIService.Iface client; - private boolean isClosed = true; + private volatile boolean isClosed = true; private SQLWarning warningChain = null; private TSessionHandle sessHandle = null; private final List supportedProtocols = new LinkedList<>(); diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java index 23f0a1f43d7..42c04e7200e 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java @@ -20,6 +20,8 @@ import com.google.common.annotations.VisibleForTesting; import java.sql.*; import java.util.*; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang3.StringUtils; import org.apache.kyuubi.jdbc.hive.adapter.SQLStatement; import org.apache.kyuubi.jdbc.hive.cli.FetchType; @@ -40,7 +42,10 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable { public static final String DEFAULT_ARROW_TIMESTAMP_AS_STRING = "false"; private final KyuubiConnection connection; private TCLIService.Iface client; - private TOperationHandle stmtHandle = null; + private volatile TOperationHandle stmtHandle = null; + // This lock must be acquired before modifying or judge stmt + // to ensure there are no concurrent accesses or race conditions. + private final Lock stmtHandleChangeLock = new ReentrantLock(); private final TSessionHandle sessHandle; Map sessConf = new HashMap<>(); private int fetchSize = DEFAULT_FETCH_SIZE; @@ -67,10 +72,10 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable { private SQLWarning warningChain = null; /** Keep state so we can fail certain calls made after close(). */ - private boolean isClosed = false; + private volatile boolean isClosed = false; /** Keep state so we can fail certain calls made after cancel(). */ - private boolean isCancelled = false; + private volatile boolean isCancelled = false; /** Keep this state so we can know whether the query in this statement is closed. */ private boolean isQueryClosed = false; @@ -124,23 +129,25 @@ public KyuubiStatement( @Override public void cancel() throws SQLException { - checkConnection("cancel"); - if (isCancelled) { - return; - } - try { + checkConnection("cancel"); + if (isCancelled) { + return; + } + stmtHandleChangeLock.lock(); if (stmtHandle != null) { TCancelOperationReq cancelReq = new TCancelOperationReq(stmtHandle); TCancelOperationResp cancelResp = client.CancelOperation(cancelReq); Utils.verifySuccessWithInfo(cancelResp.getStatus()); } + isCancelled = true; } catch (SQLException e) { throw e; } catch (Exception e) { throw new KyuubiSQLException(e.toString(), "08S01", e); + } finally { + stmtHandleChangeLock.unlock(); } - isCancelled = true; } @Override @@ -177,13 +184,18 @@ void closeClientOperation() throws SQLException { @Override public void close() throws SQLException { - if (isClosed) { - return; + try { + stmtHandleChangeLock.lock(); + if (isClosed) { + return; + } + closeClientOperation(); + client = null; + closeResultSet(); + isClosed = true; + } finally { + stmtHandleChangeLock.unlock(); } - closeClientOperation(); - client = null; - closeResultSet(); - isClosed = true; } @Override @@ -312,38 +324,43 @@ private void runAsyncOnServer(String sql) throws SQLException { } private void runAsyncOnServer(String sql, Map confOneTime) throws SQLException { - checkConnection("execute"); - - reInitState(); - - TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, sql); - /** - * Run asynchronously whenever possible Currently only a SQLOperation can be run asynchronously, - * in a background operation thread Compilation can run asynchronously or synchronously and - * execution run asynchronously - */ - execReq.setRunAsync(true); - if (confOneTime != null) { - Map confOverlay = new HashMap(sessConf); - confOverlay.putAll(confOneTime); - execReq.setConfOverlay(confOverlay); - } else { - execReq.setConfOverlay(sessConf); - } - execReq.setQueryTimeout(queryTimeout); + stmtHandleChangeLock.lock(); try { - TExecuteStatementResp execResp = client.ExecuteStatement(execReq); - Utils.verifySuccessWithInfo(execResp.getStatus()); - stmtHandle = execResp.getOperationHandle(); - isExecuteStatementFailed = false; - } catch (SQLException eS) { - isExecuteStatementFailed = true; - isLogBeingGenerated = false; - throw eS; - } catch (Exception ex) { - isExecuteStatementFailed = true; - isLogBeingGenerated = false; - throw new KyuubiSQLException(ex.toString(), "08S01", ex); + checkConnection("execute"); + + reInitState(); + + TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, sql); + /** + * Run asynchronously whenever possible Currently only a SQLOperation can be run + * asynchronously, in a background operation thread Compilation can run asynchronously or + * synchronously and execution run asynchronously + */ + execReq.setRunAsync(true); + if (confOneTime != null) { + Map confOverlay = new HashMap<>(sessConf); + confOverlay.putAll(confOneTime); + execReq.setConfOverlay(confOverlay); + } else { + execReq.setConfOverlay(sessConf); + } + execReq.setQueryTimeout(queryTimeout); + try { + TExecuteStatementResp execResp = client.ExecuteStatement(execReq); + Utils.verifySuccessWithInfo(execResp.getStatus()); + stmtHandle = execResp.getOperationHandle(); + isExecuteStatementFailed = false; + } catch (SQLException eS) { + isExecuteStatementFailed = true; + isLogBeingGenerated = false; + throw eS; + } catch (Exception ex) { + isExecuteStatementFailed = true; + isLogBeingGenerated = false; + throw new KyuubiSQLException(ex.toString(), "08S01", ex); + } + } finally { + stmtHandleChangeLock.unlock(); } } diff --git a/kyuubi-hive-jdbc/src/test/java/org/apache/kyuubi/jdbc/hive/KyuubiStatementTest.java b/kyuubi-hive-jdbc/src/test/java/org/apache/kyuubi/jdbc/hive/KyuubiStatementTest.java index 04dc40e5c0b..a4c1f9790c4 100644 --- a/kyuubi-hive-jdbc/src/test/java/org/apache/kyuubi/jdbc/hive/KyuubiStatementTest.java +++ b/kyuubi-hive-jdbc/src/test/java/org/apache/kyuubi/jdbc/hive/KyuubiStatementTest.java @@ -18,8 +18,12 @@ package org.apache.kyuubi.jdbc.hive; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import java.sql.SQLException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.junit.Assert; import org.junit.Test; public class KyuubiStatementTest { @@ -54,4 +58,29 @@ public void testaddBatch() throws SQLException { stmt.addBatch(null); } } + + @Test + public void testThrowKyuubiSQLExceptionWhenExecuteSqlOnClosedStmt() throws SQLException { + KyuubiStatement stmt = new KyuubiStatement(null, null, null); + try { + ExecutorService executorService = Executors.newFixedThreadPool(2); + executorService.submit( + () -> { + try { + stmt.close(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + }); + executorService.submit( + () -> { + Assert.assertEquals( + "Can't exectue after statement has been closed", + assertThrows(KyuubiSQLException.class, () -> stmt.execute("SELECT 1")) + .getMessage()); + }); + } finally { + stmt.close(); + } + } } From ce54a5d46decee7607be35a2d75af4083ee6a43f Mon Sep 17 00:00:00 2001 From: ruanwenjun Date: Mon, 14 Jul 2025 15:42:50 +0800 Subject: [PATCH 2/2] Move stmtHandleChangeLock to the head when cancel KyuubiStatement --- .../kyuubi/jdbc/hive/KyuubiStatement.java | 14 ++-- .../kyuubi/jdbc/hive/KyuubiStatementTest.java | 65 ++++++++++++------- 2 files changed, 48 insertions(+), 31 deletions(-) diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java index 42c04e7200e..9cf787d6963 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java @@ -45,7 +45,7 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable { private volatile TOperationHandle stmtHandle = null; // This lock must be acquired before modifying or judge stmt // to ensure there are no concurrent accesses or race conditions. - private final Lock stmtHandleChangeLock = new ReentrantLock(); + private final Lock stmtHandleAccessLock = new ReentrantLock(); private final TSessionHandle sessHandle; Map sessConf = new HashMap<>(); private int fetchSize = DEFAULT_FETCH_SIZE; @@ -130,11 +130,11 @@ public KyuubiStatement( @Override public void cancel() throws SQLException { try { + stmtHandleAccessLock.lock(); checkConnection("cancel"); if (isCancelled) { return; } - stmtHandleChangeLock.lock(); if (stmtHandle != null) { TCancelOperationReq cancelReq = new TCancelOperationReq(stmtHandle); TCancelOperationResp cancelResp = client.CancelOperation(cancelReq); @@ -146,7 +146,7 @@ public void cancel() throws SQLException { } catch (Exception e) { throw new KyuubiSQLException(e.toString(), "08S01", e); } finally { - stmtHandleChangeLock.unlock(); + stmtHandleAccessLock.unlock(); } } @@ -185,7 +185,7 @@ void closeClientOperation() throws SQLException { @Override public void close() throws SQLException { try { - stmtHandleChangeLock.lock(); + stmtHandleAccessLock.lock(); if (isClosed) { return; } @@ -194,7 +194,7 @@ public void close() throws SQLException { closeResultSet(); isClosed = true; } finally { - stmtHandleChangeLock.unlock(); + stmtHandleAccessLock.unlock(); } } @@ -324,7 +324,7 @@ private void runAsyncOnServer(String sql) throws SQLException { } private void runAsyncOnServer(String sql, Map confOneTime) throws SQLException { - stmtHandleChangeLock.lock(); + stmtHandleAccessLock.lock(); try { checkConnection("execute"); @@ -360,7 +360,7 @@ private void runAsyncOnServer(String sql, Map confOneTime) throw throw new KyuubiSQLException(ex.toString(), "08S01", ex); } } finally { - stmtHandleChangeLock.unlock(); + stmtHandleAccessLock.unlock(); } } diff --git a/kyuubi-hive-jdbc/src/test/java/org/apache/kyuubi/jdbc/hive/KyuubiStatementTest.java b/kyuubi-hive-jdbc/src/test/java/org/apache/kyuubi/jdbc/hive/KyuubiStatementTest.java index a4c1f9790c4..3935157c30b 100644 --- a/kyuubi-hive-jdbc/src/test/java/org/apache/kyuubi/jdbc/hive/KyuubiStatementTest.java +++ b/kyuubi-hive-jdbc/src/test/java/org/apache/kyuubi/jdbc/hive/KyuubiStatementTest.java @@ -21,9 +21,8 @@ import static org.junit.Assert.assertThrows; import java.sql.SQLException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import org.junit.Assert; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; public class KyuubiStatementTest { @@ -60,27 +59,45 @@ public void testaddBatch() throws SQLException { } @Test - public void testThrowKyuubiSQLExceptionWhenExecuteSqlOnClosedStmt() throws SQLException { - KyuubiStatement stmt = new KyuubiStatement(null, null, null); - try { - ExecutorService executorService = Executors.newFixedThreadPool(2); - executorService.submit( - () -> { - try { - stmt.close(); - } catch (SQLException e) { - throw new RuntimeException(e); - } - }); - executorService.submit( - () -> { - Assert.assertEquals( - "Can't exectue after statement has been closed", - assertThrows(KyuubiSQLException.class, () -> stmt.execute("SELECT 1")) - .getMessage()); - }); - } finally { - stmt.close(); + public void testThrowKyuubiSQLExceptionWhenExecuteSqlOnClosedStmt() + throws SQLException, InterruptedException { + try (KyuubiStatement stmt = new KyuubiStatement(null, null, null)) { + AtomicReference assertionFailure = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + + Thread thread1 = + new Thread( + () -> { + try { + latch.countDown(); + stmt.close(); + } catch (SQLException e) { + assertionFailure.set(e); + } + }); + + Thread thread2 = + new Thread( + () -> { + try { + latch.await(); + KyuubiSQLException ex = + assertThrows(KyuubiSQLException.class, () -> stmt.execute("SELECT 1")); + assertEquals("Can't execute after statement has been closed", ex.getMessage()); + } catch (Throwable t) { + assertionFailure.set(t); + } + }); + + thread1.start(); + thread2.start(); + + thread1.join(); + thread2.join(); + + if (assertionFailure.get() != null) { + throw new AssertionError("Assertion failed in thread", assertionFailure.get()); + } } } }