Skip to content

Commit ce54a5d

Browse files
ruanwenjunruanwenjun
authored andcommitted
Move stmtHandleChangeLock to the head when cancel KyuubiStatement
1 parent e6509c7 commit ce54a5d

File tree

2 files changed

+48
-31
lines changed

2 files changed

+48
-31
lines changed

kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
4545
private volatile TOperationHandle stmtHandle = null;
4646
// This lock must be acquired before modifying or judge stmt
4747
// to ensure there are no concurrent accesses or race conditions.
48-
private final Lock stmtHandleChangeLock = new ReentrantLock();
48+
private final Lock stmtHandleAccessLock = new ReentrantLock();
4949
private final TSessionHandle sessHandle;
5050
Map<String, String> sessConf = new HashMap<>();
5151
private int fetchSize = DEFAULT_FETCH_SIZE;
@@ -130,11 +130,11 @@ public KyuubiStatement(
130130
@Override
131131
public void cancel() throws SQLException {
132132
try {
133+
stmtHandleAccessLock.lock();
133134
checkConnection("cancel");
134135
if (isCancelled) {
135136
return;
136137
}
137-
stmtHandleChangeLock.lock();
138138
if (stmtHandle != null) {
139139
TCancelOperationReq cancelReq = new TCancelOperationReq(stmtHandle);
140140
TCancelOperationResp cancelResp = client.CancelOperation(cancelReq);
@@ -146,7 +146,7 @@ public void cancel() throws SQLException {
146146
} catch (Exception e) {
147147
throw new KyuubiSQLException(e.toString(), "08S01", e);
148148
} finally {
149-
stmtHandleChangeLock.unlock();
149+
stmtHandleAccessLock.unlock();
150150
}
151151
}
152152

@@ -185,7 +185,7 @@ void closeClientOperation() throws SQLException {
185185
@Override
186186
public void close() throws SQLException {
187187
try {
188-
stmtHandleChangeLock.lock();
188+
stmtHandleAccessLock.lock();
189189
if (isClosed) {
190190
return;
191191
}
@@ -194,7 +194,7 @@ public void close() throws SQLException {
194194
closeResultSet();
195195
isClosed = true;
196196
} finally {
197-
stmtHandleChangeLock.unlock();
197+
stmtHandleAccessLock.unlock();
198198
}
199199
}
200200

@@ -324,7 +324,7 @@ private void runAsyncOnServer(String sql) throws SQLException {
324324
}
325325

326326
private void runAsyncOnServer(String sql, Map<String, String> confOneTime) throws SQLException {
327-
stmtHandleChangeLock.lock();
327+
stmtHandleAccessLock.lock();
328328
try {
329329
checkConnection("execute");
330330

@@ -360,7 +360,7 @@ private void runAsyncOnServer(String sql, Map<String, String> confOneTime) throw
360360
throw new KyuubiSQLException(ex.toString(), "08S01", ex);
361361
}
362362
} finally {
363-
stmtHandleChangeLock.unlock();
363+
stmtHandleAccessLock.unlock();
364364
}
365365
}
366366

kyuubi-hive-jdbc/src/test/java/org/apache/kyuubi/jdbc/hive/KyuubiStatementTest.java

Lines changed: 41 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,8 @@
2121
import static org.junit.Assert.assertThrows;
2222

2323
import java.sql.SQLException;
24-
import java.util.concurrent.ExecutorService;
25-
import java.util.concurrent.Executors;
26-
import org.junit.Assert;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.atomic.AtomicReference;
2726
import org.junit.Test;
2827

2928
public class KyuubiStatementTest {
@@ -60,27 +59,45 @@ public void testaddBatch() throws SQLException {
6059
}
6160

6261
@Test
63-
public void testThrowKyuubiSQLExceptionWhenExecuteSqlOnClosedStmt() throws SQLException {
64-
KyuubiStatement stmt = new KyuubiStatement(null, null, null);
65-
try {
66-
ExecutorService executorService = Executors.newFixedThreadPool(2);
67-
executorService.submit(
68-
() -> {
69-
try {
70-
stmt.close();
71-
} catch (SQLException e) {
72-
throw new RuntimeException(e);
73-
}
74-
});
75-
executorService.submit(
76-
() -> {
77-
Assert.assertEquals(
78-
"Can't exectue after statement has been closed",
79-
assertThrows(KyuubiSQLException.class, () -> stmt.execute("SELECT 1"))
80-
.getMessage());
81-
});
82-
} finally {
83-
stmt.close();
62+
public void testThrowKyuubiSQLExceptionWhenExecuteSqlOnClosedStmt()
63+
throws SQLException, InterruptedException {
64+
try (KyuubiStatement stmt = new KyuubiStatement(null, null, null)) {
65+
AtomicReference<Throwable> assertionFailure = new AtomicReference<>();
66+
CountDownLatch latch = new CountDownLatch(1);
67+
68+
Thread thread1 =
69+
new Thread(
70+
() -> {
71+
try {
72+
latch.countDown();
73+
stmt.close();
74+
} catch (SQLException e) {
75+
assertionFailure.set(e);
76+
}
77+
});
78+
79+
Thread thread2 =
80+
new Thread(
81+
() -> {
82+
try {
83+
latch.await();
84+
KyuubiSQLException ex =
85+
assertThrows(KyuubiSQLException.class, () -> stmt.execute("SELECT 1"));
86+
assertEquals("Can't execute after statement has been closed", ex.getMessage());
87+
} catch (Throwable t) {
88+
assertionFailure.set(t);
89+
}
90+
});
91+
92+
thread1.start();
93+
thread2.start();
94+
95+
thread1.join();
96+
thread2.join();
97+
98+
if (assertionFailure.get() != null) {
99+
throw new AssertionError("Assertion failed in thread", assertionFailure.get());
100+
}
84101
}
85102
}
86103
}

0 commit comments

Comments
 (0)