Skip to content

Commit a6349dc

Browse files
ruanwenjunruanwenjun
authored andcommitted
Move stmtHandleChangeLock to the head when cancel KyuubiStatement
1 parent e6509c7 commit a6349dc

File tree

2 files changed

+31
-24
lines changed

2 files changed

+31
-24
lines changed

kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
4545
private volatile TOperationHandle stmtHandle = null;
4646
// This lock must be acquired before modifying or judge stmt
4747
// to ensure there are no concurrent accesses or race conditions.
48-
private final Lock stmtHandleChangeLock = new ReentrantLock();
48+
private final Lock stmtHandleAccessLock = new ReentrantLock();
4949
private final TSessionHandle sessHandle;
5050
Map<String, String> sessConf = new HashMap<>();
5151
private int fetchSize = DEFAULT_FETCH_SIZE;
@@ -130,11 +130,11 @@ public KyuubiStatement(
130130
@Override
131131
public void cancel() throws SQLException {
132132
try {
133+
stmtHandleAccessLock.lock();
133134
checkConnection("cancel");
134135
if (isCancelled) {
135136
return;
136137
}
137-
stmtHandleChangeLock.lock();
138138
if (stmtHandle != null) {
139139
TCancelOperationReq cancelReq = new TCancelOperationReq(stmtHandle);
140140
TCancelOperationResp cancelResp = client.CancelOperation(cancelReq);
@@ -146,7 +146,7 @@ public void cancel() throws SQLException {
146146
} catch (Exception e) {
147147
throw new KyuubiSQLException(e.toString(), "08S01", e);
148148
} finally {
149-
stmtHandleChangeLock.unlock();
149+
stmtHandleAccessLock.unlock();
150150
}
151151
}
152152

@@ -185,7 +185,7 @@ void closeClientOperation() throws SQLException {
185185
@Override
186186
public void close() throws SQLException {
187187
try {
188-
stmtHandleChangeLock.lock();
188+
stmtHandleAccessLock.lock();
189189
if (isClosed) {
190190
return;
191191
}
@@ -194,7 +194,7 @@ public void close() throws SQLException {
194194
closeResultSet();
195195
isClosed = true;
196196
} finally {
197-
stmtHandleChangeLock.unlock();
197+
stmtHandleAccessLock.unlock();
198198
}
199199
}
200200

@@ -324,7 +324,7 @@ private void runAsyncOnServer(String sql) throws SQLException {
324324
}
325325

326326
private void runAsyncOnServer(String sql, Map<String, String> confOneTime) throws SQLException {
327-
stmtHandleChangeLock.lock();
327+
stmtHandleAccessLock.lock();
328328
try {
329329
checkConnection("execute");
330330

@@ -360,7 +360,7 @@ private void runAsyncOnServer(String sql, Map<String, String> confOneTime) throw
360360
throw new KyuubiSQLException(ex.toString(), "08S01", ex);
361361
}
362362
} finally {
363-
stmtHandleChangeLock.unlock();
363+
stmtHandleAccessLock.unlock();
364364
}
365365
}
366366

kyuubi-hive-jdbc/src/test/java/org/apache/kyuubi/jdbc/hive/KyuubiStatementTest.java

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@
2121
import static org.junit.Assert.assertThrows;
2222

2323
import java.sql.SQLException;
24+
import java.util.concurrent.ExecutionException;
2425
import java.util.concurrent.ExecutorService;
2526
import java.util.concurrent.Executors;
26-
import org.junit.Assert;
27+
import java.util.concurrent.Future;
2728
import org.junit.Test;
2829

2930
public class KyuubiStatementTest {
@@ -60,25 +61,31 @@ public void testaddBatch() throws SQLException {
6061
}
6162

6263
@Test
63-
public void testThrowKyuubiSQLExceptionWhenExecuteSqlOnClosedStmt() throws SQLException {
64+
public void testThrowKyuubiSQLExceptionWhenExecuteSqlOnClosedStmt()
65+
throws SQLException, ExecutionException, InterruptedException {
6466
KyuubiStatement stmt = new KyuubiStatement(null, null, null);
6567
try {
6668
ExecutorService executorService = Executors.newFixedThreadPool(2);
67-
executorService.submit(
68-
() -> {
69-
try {
70-
stmt.close();
71-
} catch (SQLException e) {
72-
throw new RuntimeException(e);
73-
}
74-
});
75-
executorService.submit(
76-
() -> {
77-
Assert.assertEquals(
78-
"Can't exectue after statement has been closed",
79-
assertThrows(KyuubiSQLException.class, () -> stmt.execute("SELECT 1"))
80-
.getMessage());
81-
});
69+
Future<?> closeFuture =
70+
executorService.submit(
71+
() -> {
72+
try {
73+
stmt.close();
74+
} catch (SQLException e) {
75+
throw new RuntimeException(e);
76+
}
77+
});
78+
Future<?> submitFuture =
79+
executorService.submit(
80+
() -> {
81+
assertEquals(
82+
"Can't execute after statement has been closed",
83+
assertThrows(KyuubiSQLException.class, () -> stmt.execute("SELECT 1"))
84+
.getMessage());
85+
});
86+
closeFuture.get();
87+
submitFuture.get();
88+
executorService.shutdown();
8289
} finally {
8390
stmt.close();
8491
}

0 commit comments

Comments
 (0)