Skip to content

Commit 2f088f9

Browse files
committed
WIP WIP WIP Handle indeterminate YDB request state
1 parent 4cb10eb commit 2f088f9

File tree

21 files changed

+520
-164
lines changed

21 files changed

+520
-164
lines changed

aspect/src/main/java/tech/ydb/yoj/aspect/tx/YojTransactionAspect.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import org.aspectj.lang.annotation.Aspect;
77
import tech.ydb.yoj.repository.db.Tx;
88
import tech.ydb.yoj.repository.db.TxManager;
9+
import tech.ydb.yoj.repository.db.exception.ConditionallyRetryableException;
910
import tech.ydb.yoj.repository.db.exception.RetryableException;
1011

1112
/**
@@ -68,7 +69,7 @@ private Object doInTransaction(ProceedingJoinPoint pjp, YojTransactional transac
6869

6970
return localTx.tx(() -> safeCall(pjp));
7071
}
71-
} catch (CallRetryableException | CallException e) {
72+
} catch (CallRetryableException | CallConditionallyRetryableException | CallException e) {
7273
throw e.getCause();
7374
}
7475
}
@@ -88,17 +89,28 @@ Object safeCall(ProceedingJoinPoint pjp) {
8889
return pjp.proceed();
8990
} catch (RetryableException e) {
9091
throw new CallRetryableException(e);
92+
} catch (ConditionallyRetryableException e) {
93+
throw new CallConditionallyRetryableException(e);
9194
} catch (Throwable e) {
9295
throw new CallException(e);
9396
}
9497
}
9598

9699
/**
97-
* It's a hint for tx manager to retry was requested
100+
* It's a hint for tx manager that an unconditional retry was requested
98101
*/
99102
static class CallRetryableException extends RetryableException {
100103
CallRetryableException(RetryableException e) {
101-
super(e.getMessage(), e.getCause());
104+
super(e.getMessage(), e.getRetryPolicy(), e.getCause());
105+
}
106+
}
107+
108+
/**
109+
* It's a hint for tx manager that a conditional retry was requested
110+
*/
111+
static class CallConditionallyRetryableException extends ConditionallyRetryableException {
112+
CallConditionallyRetryableException(ConditionallyRetryableException e) {
113+
super(e.getMessage(), e.getRetryPolicy(), e.getCause());
102114
}
103115
}
104116

repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryRepositoryTransaction.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@
2323
import java.util.function.Supplier;
2424

2525
public class InMemoryRepositoryTransaction implements BaseDb, RepositoryTransaction {
26-
private final static AtomicLong txIdGenerator = new AtomicLong();
26+
private static final String CLOSE_ACTION_COMMIT = "commit()";
27+
private static final String CLOSE_ACTION_ROLLBACK = "rollback()";
28+
29+
private static final AtomicLong txIdGenerator = new AtomicLong();
2730

2831
private final long txId = txIdGenerator.incrementAndGet();
2932
private final Stopwatch txStopwatch = Stopwatch.createStarted();
@@ -81,7 +84,12 @@ public void commit() {
8184
if (isBadSession) {
8285
throw new IllegalStateException("Transaction was invalidated. Commit isn't possible");
8386
}
84-
endTransaction("commit()", this::commitImpl);
87+
endTransaction(CLOSE_ACTION_COMMIT, this::commitImpl);
88+
}
89+
90+
@Override
91+
public boolean wasCommitAttempted() {
92+
return CLOSE_ACTION_COMMIT.equals(closeAction);
8593
}
8694

8795
private void commitImpl() {
@@ -101,14 +109,15 @@ private void commitImpl() {
101109

102110
@Override
103111
public void rollback() {
104-
endTransaction("rollback()", this::rollbackImpl);
112+
endTransaction(CLOSE_ACTION_ROLLBACK, this::rollbackImpl);
105113
}
106114

107115
private void rollbackImpl() {
108116
storage.rollback(txId);
109117
}
110118

111119
private void endTransaction(String action, Runnable runnable) {
120+
ensureTransactionActive();
112121
try {
113122
if (isFinalActionNeeded(action)) {
114123
logTransaction(action, runnable);
@@ -134,6 +143,7 @@ private boolean isFinalActionNeeded(String action) {
134143
final <T extends Entity<T>> void doInWriteTransaction(
135144
String log, TableDescriptor<T> tableDescriptor, Consumer<WriteTxDataShard<T>> consumer
136145
) {
146+
ensureTransactionActive();
137147
if (options.isScan()) {
138148
throw new IllegalTransactionScanException("Mutable operations");
139149
}
@@ -158,6 +168,7 @@ final <T extends Entity<T>> void doInWriteTransaction(
158168
final <T extends Entity<T>, R> R doInTransaction(
159169
String action, TableDescriptor<T> tableDescriptor, Function<ReadOnlyTxDataShard<T>, R> func
160170
) {
171+
ensureTransactionActive();
161172
return logTransaction(action, () -> {
162173
InMemoryTxLockWatcher findWatcher = hasWrites ? watcher : InMemoryTxLockWatcher.NO_LOCKS;
163174
ReadOnlyTxDataShard<T> shard = storage.getReadOnlyTxDataShard(
@@ -180,10 +191,6 @@ private void logTransaction(String action, Runnable runnable) {
180191
}
181192

182193
private <R> R logTransaction(String action, Supplier<R> supplier) {
183-
if (closeAction != null) {
184-
throw new IllegalStateException("Transaction already closed by " + closeAction);
185-
}
186-
187194
Stopwatch sw = Stopwatch.createStarted();
188195
try {
189196
R result = supplier.get();
@@ -195,6 +202,12 @@ private <R> R logTransaction(String action, Supplier<R> supplier) {
195202
}
196203
}
197204

205+
private void ensureTransactionActive() {
206+
if (closeAction != null) {
207+
throw new IllegalStateException("Transaction already closed by " + closeAction);
208+
}
209+
}
210+
198211
private String printResult(Object result) {
199212
if (result instanceof Iterable<?>) {
200213
long size = Iterables.size((Iterable<?>) result);
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
package tech.ydb.yoj.repository.ydb.exception;
22

33
import tech.ydb.yoj.repository.db.exception.RetryableException;
4+
import tech.ydb.yoj.util.retry.RetryPolicy;
45

56
/**
67
* Tried to use a no longer active or valid YDB session, e.g. on a node that is now down.
78
*/
89
public class BadSessionException extends RetryableException {
910
public BadSessionException(String message) {
10-
super(message);
11+
super(message, RetryPolicy.retryImmediately());
1112
}
1213
}

repository-ydb-common/src/main/java/tech/ydb/yoj/repository/ydb/exception/YdbClientInternalException.java

Lines changed: 0 additions & 23 deletions
This file was deleted.
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package tech.ydb.yoj.repository.ydb.exception;
2+
3+
import lombok.Getter;
4+
import tech.ydb.yoj.ExperimentalApi;
5+
import tech.ydb.yoj.repository.db.exception.ConditionallyRetryableException;
6+
import tech.ydb.yoj.util.lang.Strings;
7+
import tech.ydb.yoj.util.retry.RetryPolicy;
8+
9+
/**
10+
* Base class for <em>conditionally-retryable</em> exceptions from the YDB database, the YDB Java SDK, and the GRPC client used by the YDB Java SDK.
11+
*
12+
* @see ConditionallyRetryableException conditionally-retryable exceptions
13+
*/
14+
// TODO(nvamelichev): Add subclasses of YdbConditionallyRetryableException as needed
15+
@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/165")
16+
public class YdbConditionallyRetryableException extends ConditionallyRetryableException {
17+
private static final RetryPolicy UNDETERMINED_BACKOFF = RetryPolicy.expBackoff(5L, 500L, 0.1, 2.0);
18+
19+
@Getter
20+
private final Enum<?> statusCode;
21+
22+
public YdbConditionallyRetryableException(String message, Enum<?> statusCode, Object request, Object response) {
23+
super(Strings.join("\n", "[" + statusCode + "] " + message, request, response), UNDETERMINED_BACKOFF);
24+
this.statusCode = statusCode;
25+
}
26+
}

repository-ydb-common/src/main/java/tech/ydb/yoj/repository/ydb/exception/YdbUnauthenticatedException.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@
44
import tech.ydb.yoj.repository.db.exception.RetryableException;
55
import tech.ydb.yoj.repository.db.exception.UnavailableException;
66
import tech.ydb.yoj.util.lang.Strings;
7+
import tech.ydb.yoj.util.retry.RetryPolicy;
78

89
/**
910
* YDB authentication failure, possibly a transient one. E.g., used a recently expired token.
1011
*/
1112
public class YdbUnauthenticatedException extends RetryableException {
1213
public YdbUnauthenticatedException(Object request, Object response) {
13-
super(Strings.join("\n", request, response));
14+
super(Strings.join("\n", request, response), RetryPolicy.retryImmediately());
1415
}
1516

1617
@Override

repository-ydb-common/src/main/java/tech/ydb/yoj/repository/ydb/exception/YdbUnauthorizedException.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@
44
import tech.ydb.yoj.repository.db.exception.RetryableException;
55
import tech.ydb.yoj.repository.db.exception.UnavailableException;
66
import tech.ydb.yoj.util.lang.Strings;
7+
import tech.ydb.yoj.util.retry.RetryPolicy;
78

89
/**
910
* YDB authorization failure, possibly a transient one. E.g., the principal tried to write to the database but has no
1011
* write-allowing role assigned.
1112
*/
1213
public class YdbUnauthorizedException extends RetryableException {
1314
public YdbUnauthorizedException(Object request, Object response) {
14-
super(Strings.join("\n", request, response));
15+
super(Strings.join("\n", request, response), RetryPolicy.retryImmediately());
1516
}
1617

1718
@Override

repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,11 @@
5252
import tech.ydb.yoj.repository.db.bulk.BulkParams;
5353
import tech.ydb.yoj.repository.db.cache.RepositoryCache;
5454
import tech.ydb.yoj.repository.db.cache.TransactionLocal;
55+
import tech.ydb.yoj.repository.db.exception.ConditionallyRetryableException;
5556
import tech.ydb.yoj.repository.db.exception.IllegalTransactionIsolationLevelException;
5657
import tech.ydb.yoj.repository.db.exception.IllegalTransactionScanException;
5758
import tech.ydb.yoj.repository.db.exception.OptimisticLockException;
5859
import tech.ydb.yoj.repository.db.exception.RepositoryException;
59-
import tech.ydb.yoj.repository.db.exception.UnavailableException;
6060
import tech.ydb.yoj.repository.db.readtable.ReadTableParams;
6161
import tech.ydb.yoj.repository.ydb.bulk.BulkMapper;
6262
import tech.ydb.yoj.repository.ydb.client.ResultSetConverter;
@@ -65,8 +65,6 @@
6565
import tech.ydb.yoj.repository.ydb.exception.BadSessionException;
6666
import tech.ydb.yoj.repository.ydb.exception.ResultTruncatedException;
6767
import tech.ydb.yoj.repository.ydb.exception.UnexpectedException;
68-
import tech.ydb.yoj.repository.ydb.exception.YdbComponentUnavailableException;
69-
import tech.ydb.yoj.repository.ydb.exception.YdbOverloadedException;
7068
import tech.ydb.yoj.repository.ydb.exception.YdbRepositoryException;
7169
import tech.ydb.yoj.repository.ydb.merge.QueriesMerger;
7270
import tech.ydb.yoj.repository.ydb.readtable.ReadTableMapper;
@@ -100,6 +98,9 @@ public class YdbRepositoryTransaction<REPO extends YdbRepository>
10098
private static final String PROP_TRACE_VERBOSE_OBJ_PARAMS = "tech.ydb.yoj.repository.ydb.trace.verboseObjParams";
10199
private static final String PROP_TRACE_VERBOSE_OBJ_RESULTS = "tech.ydb.yoj.repository.ydb.trace.verboseObjResults";
102100

101+
private static final String CLOSE_ACTION_COMMIT = "commit()";
102+
private static final String CLOSE_ACTION_ROLLBACK = "rollback()";
103+
103104
private final List<YdbRepository.Query<?>> pendingWrites = new ArrayList<>();
104105
private final List<YdbSpliterator<?>> spliterators = new ArrayList<>();
105106

@@ -154,16 +155,21 @@ public void commit() {
154155
rollback();
155156
throw t;
156157
}
157-
endTransaction("commit", this::doCommit);
158+
endTransaction(CLOSE_ACTION_COMMIT, this::doCommit);
159+
}
160+
161+
@Override
162+
public boolean wasCommitAttempted() {
163+
return CLOSE_ACTION_COMMIT.equals(closeAction);
158164
}
159165

160166
@Override
161167
public void rollback() {
162168
Interrupts.runInCleanupMode(() -> {
163169
try {
164-
endTransaction("rollback", () -> {
170+
endTransaction(CLOSE_ACTION_ROLLBACK, () -> {
165171
Status status = YdbOperations.safeJoin(session.rollbackTransaction(txId, new RollbackTxSettings()));
166-
validate("rollback", status.getCode(), status.toString());
172+
validate(CLOSE_ACTION_ROLLBACK, status.getCode(), status.toString());
167173
});
168174
} catch (Throwable t) {
169175
log.info("Failed to rollback the transaction", t);
@@ -172,13 +178,9 @@ public void rollback() {
172178
}
173179

174180
private void doCommit() {
175-
try {
176-
Status status = YdbOperations.safeJoin(session.commitTransaction(txId, new CommitTxSettings()));
177-
validatePkConstraint(status.getIssues());
178-
validate("commit", status.getCode(), status.toString());
179-
} catch (YdbComponentUnavailableException | YdbOverloadedException e) {
180-
throw new UnavailableException("Unknown transaction state: commit was sent, but result is unknown", e);
181-
}
181+
Status status = YdbOperations.safeJoin(session.commitTransaction(txId, new CommitTxSettings()));
182+
validatePkConstraint(status.getIssues());
183+
validate(CLOSE_ACTION_COMMIT, status.getCode(), status.toString());
182184
}
183185

184186
private void closeStreams() {
@@ -209,6 +211,8 @@ private void validate(String request, StatusCode statusCode, String response) {
209211
} catch (BadSessionException | OptimisticLockException e) {
210212
transactionLocal.log().info("Request got %s: DB tx was invalidated", e.getClass().getSimpleName());
211213
throw e;
214+
} catch (ConditionallyRetryableException e) {
215+
throw options.canConditionallyRetry(CLOSE_ACTION_COMMIT.equals(request)) ? e : e.failImmediately();
212216
}
213217
}
214218

@@ -222,7 +226,7 @@ private boolean isFinalActionNeeded(String actionName) {
222226
return false;
223227
}
224228
if (options.isReadOnly() && options.getIsolationLevel() != IsolationLevel.SNAPSHOT) {
225-
transactionLocal.log().info("No-op %s: read-only tx @%s", actionName, options.getIsolationLevel());
229+
transactionLocal.log().info("No-op %s: read-only non-SNAPSHOT tx @%s", actionName, options.getIsolationLevel());
226230
return false;
227231
}
228232
if (txId == null) {

repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/client/YdbSessionManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import tech.ydb.table.TableClient;
66
import tech.ydb.yoj.InternalApi;
77
import tech.ydb.yoj.repository.db.exception.QueryInterruptedException;
8-
import tech.ydb.yoj.repository.db.exception.RetryableException;
8+
import tech.ydb.yoj.repository.db.exception.RetryableExceptionBase;
99
import tech.ydb.yoj.repository.db.exception.UnavailableException;
1010
import tech.ydb.yoj.repository.ydb.metrics.GaugeSupplierCollector;
1111

@@ -78,7 +78,7 @@ public void warmup() {
7878
try {
7979
session = getSession();
8080
break;
81-
} catch (RetryableException ex) {
81+
} catch (RetryableExceptionBase ex) {
8282
if (i == maxRetrySessionCreateCount - 1) {
8383
throw ex;
8484
}

0 commit comments

Comments
 (0)