From a2a0d8cba6dd671895b4e5c1906672161445dbe6 Mon Sep 17 00:00:00 2001 From: Nikolai Amelichev Date: Fri, 29 Aug 2025 18:35:28 +0200 Subject: [PATCH] WIP WIP WIP Handle indeterminate YDB request state --- .../yoj/aspect/tx/YojTransactionAspect.java | 18 +- .../InMemoryRepositoryTransaction.java | 27 ++- .../ydb/exception/BadSessionException.java | 3 +- .../exception/YdbClientInternalException.java | 23 --- .../YdbConditionallyRetryableException.java | 29 +++ .../YdbUnauthenticatedException.java | 3 +- .../exception/YdbUnauthorizedException.java | 3 +- .../ydb/yoj/repository/ydb/YdbOperations.java | 4 +- .../ydb/YdbRepositoryTransaction.java | 48 ++--- .../yoj/repository/ydb/YdbSpliterator.java | 2 +- .../ydb/client/YdbSessionManager.java | 10 +- .../repository/ydb/client/YdbValidator.java | 165 ++++++++++------ .../ydb/YdbRepositoryIntegrationTest.java | 182 +++++++++++++++--- .../repository/db/ConditionalRetryMode.java | 25 +++ .../repository/db/RepositoryTransaction.java | 18 +- .../ydb/yoj/repository/db/StdTxManager.java | 36 +++- .../tech/ydb/yoj/repository/db/TxManager.java | 28 ++- .../tech/ydb/yoj/repository/db/TxOptions.java | 34 ++++ .../ConditionallyRetryableException.java | 41 ++++ .../EntityAlreadyExistsException.java | 4 +- .../db/exception/OptimisticLockException.java | 6 +- .../db/exception/RetryableException.java | 37 +--- .../db/exception/RetryableExceptionBase.java | 46 +++++ 23 files changed, 591 insertions(+), 201 deletions(-) delete mode 100644 repository-ydb-common/src/main/java/tech/ydb/yoj/repository/ydb/exception/YdbClientInternalException.java create mode 100644 repository-ydb-common/src/main/java/tech/ydb/yoj/repository/ydb/exception/YdbConditionallyRetryableException.java create mode 100644 repository/src/main/java/tech/ydb/yoj/repository/db/ConditionalRetryMode.java create mode 100644 repository/src/main/java/tech/ydb/yoj/repository/db/exception/ConditionallyRetryableException.java create mode 100644 repository/src/main/java/tech/ydb/yoj/repository/db/exception/RetryableExceptionBase.java diff --git a/aspect/src/main/java/tech/ydb/yoj/aspect/tx/YojTransactionAspect.java b/aspect/src/main/java/tech/ydb/yoj/aspect/tx/YojTransactionAspect.java index f295e594..b03460b8 100644 --- a/aspect/src/main/java/tech/ydb/yoj/aspect/tx/YojTransactionAspect.java +++ b/aspect/src/main/java/tech/ydb/yoj/aspect/tx/YojTransactionAspect.java @@ -6,6 +6,7 @@ import org.aspectj.lang.annotation.Aspect; import tech.ydb.yoj.repository.db.Tx; import tech.ydb.yoj.repository.db.TxManager; +import tech.ydb.yoj.repository.db.exception.ConditionallyRetryableException; import tech.ydb.yoj.repository.db.exception.RetryableException; /** @@ -68,7 +69,7 @@ private Object doInTransaction(ProceedingJoinPoint pjp, YojTransactional transac return localTx.tx(() -> safeCall(pjp)); } - } catch (CallRetryableException | CallException e) { + } catch (CallRetryableException | CallConditionallyRetryableException | CallException e) { throw e.getCause(); } } @@ -88,17 +89,28 @@ Object safeCall(ProceedingJoinPoint pjp) { return pjp.proceed(); } catch (RetryableException e) { throw new CallRetryableException(e); + } catch (ConditionallyRetryableException e) { + throw new CallConditionallyRetryableException(e); } catch (Throwable e) { throw new CallException(e); } } /** - * It's a hint for tx manager to retry was requested + * It's a hint for tx manager that an unconditional retry was requested */ static class CallRetryableException extends RetryableException { CallRetryableException(RetryableException e) { - super(e.getMessage(), e.getCause()); + super(e.getMessage(), e.getRetryPolicy(), e.getCause()); + } + } + + /** + * It's a hint for tx manager that a conditional retry was requested + */ + static class CallConditionallyRetryableException extends ConditionallyRetryableException { + CallConditionallyRetryableException(ConditionallyRetryableException e) { + super(e.getMessage(), e.getRetryPolicy(), e.getCause()); } } diff --git a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryRepositoryTransaction.java b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryRepositoryTransaction.java index 8aa48337..108b2ff1 100644 --- a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryRepositoryTransaction.java +++ b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryRepositoryTransaction.java @@ -23,7 +23,10 @@ import java.util.function.Supplier; public class InMemoryRepositoryTransaction implements BaseDb, RepositoryTransaction { - private final static AtomicLong txIdGenerator = new AtomicLong(); + private static final String CLOSE_ACTION_COMMIT = "commit()"; + private static final String CLOSE_ACTION_ROLLBACK = "rollback()"; + + private static final AtomicLong txIdGenerator = new AtomicLong(); private final long txId = txIdGenerator.incrementAndGet(); private final Stopwatch txStopwatch = Stopwatch.createStarted(); @@ -81,7 +84,12 @@ public void commit() { if (isBadSession) { throw new IllegalStateException("Transaction was invalidated. Commit isn't possible"); } - endTransaction("commit()", this::commitImpl); + endTransaction(CLOSE_ACTION_COMMIT, this::commitImpl); + } + + @Override + public boolean wasCommitAttempted() { + return CLOSE_ACTION_COMMIT.equals(closeAction); } private void commitImpl() { @@ -101,7 +109,7 @@ private void commitImpl() { @Override public void rollback() { - endTransaction("rollback()", this::rollbackImpl); + endTransaction(CLOSE_ACTION_ROLLBACK, this::rollbackImpl); } private void rollbackImpl() { @@ -109,6 +117,7 @@ private void rollbackImpl() { } private void endTransaction(String action, Runnable runnable) { + ensureTransactionActive(); try { if (isFinalActionNeeded(action)) { logTransaction(action, runnable); @@ -134,6 +143,7 @@ private boolean isFinalActionNeeded(String action) { final > void doInWriteTransaction( String log, TableDescriptor tableDescriptor, Consumer> consumer ) { + ensureTransactionActive(); if (options.isScan()) { throw new IllegalTransactionScanException("Mutable operations"); } @@ -158,6 +168,7 @@ final > void doInWriteTransaction( final , R> R doInTransaction( String action, TableDescriptor tableDescriptor, Function, R> func ) { + ensureTransactionActive(); return logTransaction(action, () -> { InMemoryTxLockWatcher findWatcher = hasWrites ? watcher : InMemoryTxLockWatcher.NO_LOCKS; ReadOnlyTxDataShard shard = storage.getReadOnlyTxDataShard( @@ -180,10 +191,6 @@ private void logTransaction(String action, Runnable runnable) { } private R logTransaction(String action, Supplier supplier) { - if (closeAction != null) { - throw new IllegalStateException("Transaction already closed by " + closeAction); - } - Stopwatch sw = Stopwatch.createStarted(); try { R result = supplier.get(); @@ -195,6 +202,12 @@ private R logTransaction(String action, Supplier supplier) { } } + private void ensureTransactionActive() { + if (closeAction != null) { + throw new IllegalStateException("Transaction already closed by " + closeAction); + } + } + private String printResult(Object result) { if (result instanceof Iterable) { long size = Iterables.size((Iterable) result); diff --git a/repository-ydb-common/src/main/java/tech/ydb/yoj/repository/ydb/exception/BadSessionException.java b/repository-ydb-common/src/main/java/tech/ydb/yoj/repository/ydb/exception/BadSessionException.java index b7a6d7b6..f274e997 100644 --- a/repository-ydb-common/src/main/java/tech/ydb/yoj/repository/ydb/exception/BadSessionException.java +++ b/repository-ydb-common/src/main/java/tech/ydb/yoj/repository/ydb/exception/BadSessionException.java @@ -1,12 +1,13 @@ package tech.ydb.yoj.repository.ydb.exception; import tech.ydb.yoj.repository.db.exception.RetryableException; +import tech.ydb.yoj.util.retry.RetryPolicy; /** * Tried to use a no longer active or valid YDB session, e.g. on a node that is now down. */ public class BadSessionException extends RetryableException { public BadSessionException(String message) { - super(message); + super(message, RetryPolicy.retryImmediately()); } } diff --git a/repository-ydb-common/src/main/java/tech/ydb/yoj/repository/ydb/exception/YdbClientInternalException.java b/repository-ydb-common/src/main/java/tech/ydb/yoj/repository/ydb/exception/YdbClientInternalException.java deleted file mode 100644 index 9ee3229b..00000000 --- a/repository-ydb-common/src/main/java/tech/ydb/yoj/repository/ydb/exception/YdbClientInternalException.java +++ /dev/null @@ -1,23 +0,0 @@ -package tech.ydb.yoj.repository.ydb.exception; - -import tech.ydb.yoj.repository.db.exception.RepositoryException; -import tech.ydb.yoj.repository.db.exception.RetryableException; -import tech.ydb.yoj.repository.db.exception.UnavailableException; -import tech.ydb.yoj.util.lang.Strings; -import tech.ydb.yoj.util.retry.RetryPolicy; - -/** - * Internal YDB SDK exception, caused by a transport failure, internal authorization/authentication error etc. - */ -public final class YdbClientInternalException extends RetryableException { - private static final RetryPolicy RETRY_POLICY = RetryPolicy.fixed(100L, 0.2); - - public YdbClientInternalException(Object request, Object response) { - super(Strings.join("\n", request, response), RETRY_POLICY); - } - - @Override - public RepositoryException rethrow() { - return UnavailableException.afterRetries("YDB SDK internal exception, retries failed", this); - } -} diff --git a/repository-ydb-common/src/main/java/tech/ydb/yoj/repository/ydb/exception/YdbConditionallyRetryableException.java b/repository-ydb-common/src/main/java/tech/ydb/yoj/repository/ydb/exception/YdbConditionallyRetryableException.java new file mode 100644 index 00000000..d2f3667b --- /dev/null +++ b/repository-ydb-common/src/main/java/tech/ydb/yoj/repository/ydb/exception/YdbConditionallyRetryableException.java @@ -0,0 +1,29 @@ +package tech.ydb.yoj.repository.ydb.exception; + +import lombok.Getter; +import tech.ydb.yoj.ExperimentalApi; +import tech.ydb.yoj.repository.db.exception.ConditionallyRetryableException; +import tech.ydb.yoj.util.lang.Strings; +import tech.ydb.yoj.util.retry.RetryPolicy; + +/** + * Base class for conditionally-retryable exceptions from the YDB database, the YDB Java SDK, and the GRPC client used by the YDB Java SDK. + * + * @see ConditionallyRetryableException Conditionally-retryable Exceptions + */ +@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/165") +public class YdbConditionallyRetryableException extends ConditionallyRetryableException { + private static final RetryPolicy UNDETERMINED_BACKOFF = RetryPolicy.expBackoff(5L, 500L, 0.1, 2.0); + + @Getter + private final Enum statusCode; + + public YdbConditionallyRetryableException(String message, Enum statusCode, Object request, Object response) { + this(message, statusCode, request, response, UNDETERMINED_BACKOFF); + } + + public YdbConditionallyRetryableException(String message, Enum statusCode, Object request, Object response, RetryPolicy retryPolicy) { + super(Strings.join("\n", "[" + statusCode + "] " + message, request, response), retryPolicy); + this.statusCode = statusCode; + } +} diff --git a/repository-ydb-common/src/main/java/tech/ydb/yoj/repository/ydb/exception/YdbUnauthenticatedException.java b/repository-ydb-common/src/main/java/tech/ydb/yoj/repository/ydb/exception/YdbUnauthenticatedException.java index 91c45b0b..c0c45b87 100644 --- a/repository-ydb-common/src/main/java/tech/ydb/yoj/repository/ydb/exception/YdbUnauthenticatedException.java +++ b/repository-ydb-common/src/main/java/tech/ydb/yoj/repository/ydb/exception/YdbUnauthenticatedException.java @@ -4,13 +4,14 @@ import tech.ydb.yoj.repository.db.exception.RetryableException; import tech.ydb.yoj.repository.db.exception.UnavailableException; import tech.ydb.yoj.util.lang.Strings; +import tech.ydb.yoj.util.retry.RetryPolicy; /** * YDB authentication failure, possibly a transient one. E.g., used a recently expired token. */ public class YdbUnauthenticatedException extends RetryableException { public YdbUnauthenticatedException(Object request, Object response) { - super(Strings.join("\n", request, response)); + super(Strings.join("\n", request, response), RetryPolicy.retryImmediately()); } @Override diff --git a/repository-ydb-common/src/main/java/tech/ydb/yoj/repository/ydb/exception/YdbUnauthorizedException.java b/repository-ydb-common/src/main/java/tech/ydb/yoj/repository/ydb/exception/YdbUnauthorizedException.java index ccf9c9fe..52ef3761 100644 --- a/repository-ydb-common/src/main/java/tech/ydb/yoj/repository/ydb/exception/YdbUnauthorizedException.java +++ b/repository-ydb-common/src/main/java/tech/ydb/yoj/repository/ydb/exception/YdbUnauthorizedException.java @@ -4,6 +4,7 @@ import tech.ydb.yoj.repository.db.exception.RetryableException; import tech.ydb.yoj.repository.db.exception.UnavailableException; import tech.ydb.yoj.util.lang.Strings; +import tech.ydb.yoj.util.retry.RetryPolicy; /** * YDB authorization failure, possibly a transient one. E.g., the principal tried to write to the database but has no @@ -11,7 +12,7 @@ */ public class YdbUnauthorizedException extends RetryableException { public YdbUnauthorizedException(Object request, Object response) { - super(Strings.join("\n", request, response)); + super(Strings.join("\n", request, response), RetryPolicy.retryImmediately()); } @Override diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbOperations.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbOperations.java index d8ef5c33..0a116438 100644 --- a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbOperations.java +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbOperations.java @@ -15,7 +15,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import static tech.ydb.yoj.repository.ydb.client.YdbValidator.checkGrpcContextStatus; +import static tech.ydb.yoj.repository.ydb.client.YdbValidator.checkGrpcTimeoutAndCancellation; import static tech.ydb.yoj.util.lang.Interrupts.isThreadInterrupted; @InternalApi @@ -40,7 +40,7 @@ private static RepositoryException convertToUnavailable(Throwable ex) { Thread.currentThread().interrupt(); return new QueryInterruptedException("DB query interrupted", ex); } - checkGrpcContextStatus(ex.getMessage(), ex); + checkGrpcTimeoutAndCancellation(ex.getMessage(), ex); return new UnavailableException("DB is unavailable", ex); } diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java index eab3bb9f..94ac9cdd 100644 --- a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java @@ -14,7 +14,6 @@ import tech.ydb.common.transaction.YdbTransaction; import tech.ydb.core.Result; import tech.ydb.core.Status; -import tech.ydb.core.StatusCode; import tech.ydb.proto.ValueProtos; import tech.ydb.table.Session; import tech.ydb.table.query.DataQueryResult; @@ -52,11 +51,11 @@ import tech.ydb.yoj.repository.db.bulk.BulkParams; import tech.ydb.yoj.repository.db.cache.RepositoryCache; import tech.ydb.yoj.repository.db.cache.TransactionLocal; +import tech.ydb.yoj.repository.db.exception.ConditionallyRetryableException; import tech.ydb.yoj.repository.db.exception.IllegalTransactionIsolationLevelException; import tech.ydb.yoj.repository.db.exception.IllegalTransactionScanException; import tech.ydb.yoj.repository.db.exception.OptimisticLockException; import tech.ydb.yoj.repository.db.exception.RepositoryException; -import tech.ydb.yoj.repository.db.exception.UnavailableException; import tech.ydb.yoj.repository.db.readtable.ReadTableParams; import tech.ydb.yoj.repository.ydb.bulk.BulkMapper; import tech.ydb.yoj.repository.ydb.client.ResultSetConverter; @@ -65,8 +64,6 @@ import tech.ydb.yoj.repository.ydb.exception.BadSessionException; import tech.ydb.yoj.repository.ydb.exception.ResultTruncatedException; import tech.ydb.yoj.repository.ydb.exception.UnexpectedException; -import tech.ydb.yoj.repository.ydb.exception.YdbComponentUnavailableException; -import tech.ydb.yoj.repository.ydb.exception.YdbOverloadedException; import tech.ydb.yoj.repository.ydb.exception.YdbRepositoryException; import tech.ydb.yoj.repository.ydb.merge.QueriesMerger; import tech.ydb.yoj.repository.ydb.readtable.ReadTableMapper; @@ -90,7 +87,6 @@ import static java.lang.Boolean.getBoolean; import static java.util.stream.Collectors.toList; import static lombok.AccessLevel.PRIVATE; -import static tech.ydb.yoj.repository.ydb.client.YdbValidator.validatePkConstraint; public class YdbRepositoryTransaction implements BaseDb, RepositoryTransaction, YdbTable.QueryExecutor { @@ -100,6 +96,9 @@ public class YdbRepositoryTransaction private static final String PROP_TRACE_VERBOSE_OBJ_PARAMS = "tech.ydb.yoj.repository.ydb.trace.verboseObjParams"; private static final String PROP_TRACE_VERBOSE_OBJ_RESULTS = "tech.ydb.yoj.repository.ydb.trace.verboseObjResults"; + private static final String CLOSE_ACTION_COMMIT = "commit()"; + private static final String CLOSE_ACTION_ROLLBACK = "rollback()"; + private final List> pendingWrites = new ArrayList<>(); private final List> spliterators = new ArrayList<>(); @@ -154,16 +153,21 @@ public void commit() { rollback(); throw t; } - endTransaction("commit", this::doCommit); + endTransaction(CLOSE_ACTION_COMMIT, this::doCommit); + } + + @Override + public boolean wasCommitAttempted() { + return CLOSE_ACTION_COMMIT.equals(closeAction); } @Override public void rollback() { Interrupts.runInCleanupMode(() -> { try { - endTransaction("rollback", () -> { + endTransaction(CLOSE_ACTION_ROLLBACK, () -> { Status status = YdbOperations.safeJoin(session.rollbackTransaction(txId, new RollbackTxSettings())); - validate("rollback", status.getCode(), status.toString()); + validate(CLOSE_ACTION_ROLLBACK, status, status.toString()); }); } catch (Throwable t) { log.info("Failed to rollback the transaction", t); @@ -172,13 +176,8 @@ public void rollback() { } private void doCommit() { - try { - Status status = YdbOperations.safeJoin(session.commitTransaction(txId, new CommitTxSettings())); - validatePkConstraint(status.getIssues()); - validate("commit", status.getCode(), status.toString()); - } catch (YdbComponentUnavailableException | YdbOverloadedException e) { - throw new UnavailableException("Unknown transaction state: commit was sent, but result is unknown", e); - } + Status status = YdbOperations.safeJoin(session.commitTransaction(txId, new CommitTxSettings())); + validate(CLOSE_ACTION_COMMIT, status, status.toString()); } private void closeStreams() { @@ -200,15 +199,17 @@ private void closeStreams() { } } - private void validate(String request, StatusCode statusCode, String response) { + private void validate(String request, Status status, String response) { if (!isBadSession) { - isBadSession = YdbValidator.isTransactionClosedByServer(statusCode); + isBadSession = YdbValidator.isTransactionClosedByServer(status); } try { - YdbValidator.validate(request, statusCode, response); + YdbValidator.validate(request, status, response); } catch (BadSessionException | OptimisticLockException e) { transactionLocal.log().info("Request got %s: DB tx was invalidated", e.getClass().getSimpleName()); throw e; + } catch (ConditionallyRetryableException e) { + throw options.canConditionallyRetry(CLOSE_ACTION_COMMIT.equals(request)) ? e : e.failImmediately(); } } @@ -222,7 +223,7 @@ private boolean isFinalActionNeeded(String actionName) { return false; } if (options.isReadOnly() && options.getIsolationLevel() != IsolationLevel.SNAPSHOT) { - transactionLocal.log().info("No-op %s: read-only tx @%s", actionName, options.getIsolationLevel()); + transactionLocal.log().info("No-op %s: read-only non-SNAPSHOT tx @%s", actionName, options.getIsolationLevel()); return false; } if (txId == null) { @@ -353,8 +354,7 @@ private List doExecuteDataQuery(Statement 1) { @@ -412,7 +412,7 @@ private List doExecuteScanQueryLegacy(Statement void bulkUpsert(BulkMapper mapper, List input, BulkParams pa settings ) ); - validate("bulkInsert", status.getCode(), status.toString()); + validate("bulkInsert", status, status.toString()); } catch (RepositoryException e) { throw e; } catch (Exception e) { @@ -580,7 +580,7 @@ public Stream readTable(ReadTableMapper ), params.getTimeout().plusMinutes(5) ); - validate("readTable", status.getCode(), status.toString()); + validate("readTable", status, status.toString()); }) ); return spliterator.makeStream(); diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbSpliterator.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbSpliterator.java index 6b6903e1..344e76a5 100644 --- a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbSpliterator.java +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbSpliterator.java @@ -69,7 +69,7 @@ protected YdbSpliterator(String request, boolean isOrdered, Duration streamWorkT if (error != null) { throw YdbOperations.convertToRepositoryException(error); } - validate(request, status.getCode(), status.toString()); + validate(request, status, status.toString()); }; } diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/client/YdbSessionManager.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/client/YdbSessionManager.java index 7c99365e..2607bd2a 100644 --- a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/client/YdbSessionManager.java +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/client/YdbSessionManager.java @@ -5,7 +5,7 @@ import tech.ydb.table.TableClient; import tech.ydb.yoj.InternalApi; import tech.ydb.yoj.repository.db.exception.QueryInterruptedException; -import tech.ydb.yoj.repository.db.exception.RetryableException; +import tech.ydb.yoj.repository.db.exception.RetryableExceptionBase; import tech.ydb.yoj.repository.db.exception.UnavailableException; import tech.ydb.yoj.repository.ydb.metrics.GaugeSupplierCollector; @@ -19,6 +19,8 @@ @InternalApi public final class YdbSessionManager implements SessionManager { + public static final String REQUEST_CREATE_SESSION = "session create"; + private static final GaugeSupplierCollector sessionStatCollector = GaugeSupplierCollector.build() .namespace("ydb") .subsystem("session_manager") @@ -45,7 +47,7 @@ public Session getSession() { CompletableFuture> future = tableClient.createSession(sessionTimeout); try { Result result = future.get(); - YdbValidator.validate("session create", result.getStatus().getCode(), result.toString()); + YdbValidator.validate(REQUEST_CREATE_SESSION, result.getStatus(), result.toString()); return result.getValue(); } catch (CancellationException | CompletionException | ExecutionException | InterruptedException e) { // We need to cancel future bacause in other case we can get session leak @@ -55,7 +57,7 @@ public Session getSession() { Thread.currentThread().interrupt(); throw new QueryInterruptedException("get session interrupted", e); } - YdbValidator.checkGrpcContextStatus(e.getMessage(), e); + YdbValidator.checkGrpcTimeoutAndCancellation(e.getMessage(), e); throw new UnavailableException("DB is unavailable", e); } @@ -78,7 +80,7 @@ public void warmup() { try { session = getSession(); break; - } catch (RetryableException ex) { + } catch (RetryableExceptionBase ex) { if (i == maxRetrySessionCreateCount - 1) { throw ex; } diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/client/YdbValidator.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/client/YdbValidator.java index b7355d6b..24bb359f 100644 --- a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/client/YdbValidator.java +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/client/YdbValidator.java @@ -5,15 +5,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import tech.ydb.core.Issue; +import tech.ydb.core.Status; import tech.ydb.core.StatusCode; import tech.ydb.yoj.InternalApi; import tech.ydb.yoj.repository.db.exception.DeadlineExceededException; import tech.ydb.yoj.repository.db.exception.EntityAlreadyExistsException; import tech.ydb.yoj.repository.db.exception.OptimisticLockException; import tech.ydb.yoj.repository.db.exception.QueryCancelledException; +import tech.ydb.yoj.repository.db.exception.UnavailableException; import tech.ydb.yoj.repository.ydb.exception.BadSessionException; -import tech.ydb.yoj.repository.ydb.exception.YdbClientInternalException; import tech.ydb.yoj.repository.ydb.exception.YdbComponentUnavailableException; +import tech.ydb.yoj.repository.ydb.exception.YdbConditionallyRetryableException; import tech.ydb.yoj.repository.ydb.exception.YdbOverloadedException; import tech.ydb.yoj.repository.ydb.exception.YdbRepositoryException; import tech.ydb.yoj.repository.ydb.exception.YdbSchemaException; @@ -21,10 +23,17 @@ import tech.ydb.yoj.repository.ydb.exception.YdbUnauthorizedException; import javax.annotation.Nullable; -import java.util.function.Function; +import java.util.function.Predicate; import static lombok.AccessLevel.PRIVATE; +import static tech.ydb.yoj.repository.ydb.client.YdbSessionManager.REQUEST_CREATE_SESSION; +/** + * @see YDB Server Status Codes + * @see YDB GRPC Status Codes + * @see YDB Status Codes + * and YDB Java SDK Status Codes + */ @InternalApi public final class YdbValidator { private static final Logger log = LoggerFactory.getLogger(YdbValidator.class); @@ -32,37 +41,56 @@ public final class YdbValidator { private YdbValidator() { } - public static void validate(String request, StatusCode statusCode, String response) { + public static void validate(String request, Status status, String response) { + StatusCode statusCode = status.getCode(); + Issue[] issues = status.getIssues(); + switch (statusCode) { // Success. Do nothing ;-) case SUCCESS -> { } // Current session can no longer be used. Retry immediately by creating a new session - case BAD_SESSION, - SESSION_EXPIRED, - // Prepared statement or transaction was not found - NOT_FOUND -> throw new BadSessionException(response); + case BAD_SESSION, // This session is no longer available. Create a new session + SESSION_EXPIRED, // The session has already expired. Create a new session + NOT_FOUND -> { // Prepared statement or transaction was not found in current session. Create a new session + throw new BadSessionException(response); + } // Transaction locks invalidated: somebody touched the same rows that we've read and/or changed in a SERIALIZABLE-level transaction. // Retry immediately case ABORTED -> throw new OptimisticLockException(response); + // The request was cancelled because the request timeout (CancelAfter) has expired. The request has been cancelled on the server. + // Non-retryable + case CANCELLED -> throw new QueryCancelledException(response); + + // YDB SDK's GRPC client's deadline expired **before** the request was sent to the server: + // - If there is an expired GRPC context deadline **for the current thread**, assume we're running inside a GRPC request handler + // => No retry, terminate immediately with DeadlineExceededException to answer GRPC request promptly. + // - If three is no GRPC context deadline for the current thread, but we timed out in session manager + // => Retry with slow backoff, hoping we acquire a session soon. + // - Otherwise, assume this was an internal YOJ deadline (see YdbOperations.safeJoin()) + // => Fail immediately with UnavailableException. + case CLIENT_DEADLINE_EXPIRED -> { + checkGrpcTimeoutAndCancellation(response, null); + + if (REQUEST_CREATE_SESSION.equals(request)) { + log.warn(""" + Timed out waiting to get a session from the pool + Request: {} + Response: {}""", request, response); + throw new YdbOverloadedException(request, response); + } else { + throw new UnavailableException(response); + } + } + // DB overloaded and similar conditions. Slow retry with exponential backoff - case OVERLOADED, - // DB took too long to respond - TIMEOUT, - // The request was cancelled because the request timeout (CancelAfter) has expired. The request has been cancelled on the server - CANCELLED, - // Not enough resources to process the request - CLIENT_RESOURCE_EXHAUSTED, - // Deadline expired before the request was sent to the server - CLIENT_DEADLINE_EXPIRED, - // The request was cancelled on the client, at the transport level (because the GRPC deadline expired) - CLIENT_DEADLINE_EXCEEDED -> { - checkGrpcContextStatus(response, null); + case OVERLOADED, // A part of the system is overloaded. Retry the last action (query) and reduce the query rate. + CLIENT_RESOURCE_EXHAUSTED -> { // Not enough resources to process the request + checkGrpcTimeoutAndCancellation(response, null); - // The result of the request is unknown; it might have been cancelled... or it executed successfully! log.warn(""" Database is overloaded, but we still got a reply from the DB Request: {} @@ -70,32 +98,50 @@ public static void validate(String request, StatusCode statusCode, String respon throw new YdbOverloadedException(request, response); } - // Unknown error on the client side (most often at the transport level). Fast retry with fixed interval - case CLIENT_CANCELLED, - CLIENT_GRPC_ERROR, - CLIENT_INTERNAL_ERROR -> { - checkGrpcContextStatus(response, null); + // The query cannot be executed in the current state. Non-retryable + // - Primary key/UNIQUE index violations are checked first; for these we throw EntityAlreadyExistsException. + // - All other "failed preconditions" unknown to YOJ are considered to be non-retryable, and we throw YdbRepositoryException. + case PRECONDITION_FAILED -> { + if (is(issues, IssueCode.CONSTRAINT_VIOLATION::matches)) { + throw new EntityAlreadyExistsException("Entity already exists: " + errorMessageFrom(issues)); + } else { + throw new YdbRepositoryException(request, response); + } + } + + // DB, one of its components, or the transport is temporarily unavailable. Fast retry with fixed interval + case UNAVAILABLE, // DB responded that it or some of its subsystems are unavailable + CLIENT_DISCOVERY_FAILED, // Error occurred while retrieving the list of endpoints + CLIENT_LIMITS_REACHED, // Client-side session limit reached + SESSION_BUSY -> { // Another query is being executed in this session, should retry with a new session + checkGrpcTimeoutAndCancellation(response, null); log.warn(""" - YDB SDK internal error or cancellation + Some database components are not available, but we still got a reply from the DB Request: {} Response: {}""", request, response); - throw new YdbClientInternalException(request, response); + throw new YdbComponentUnavailableException(request, response); } - // DB, one of its components, or the transport is temporarily unavailable. Fast retry with fixed interval - case UNAVAILABLE, // DB responded that it or some of its subsystems are unavailable - TRANSPORT_UNAVAILABLE, // Network connectivity issues - CLIENT_DISCOVERY_FAILED, // Error occurred while retrieving the list of endpoints - CLIENT_LIMITS_REACHED, // Client-side session limit reached - UNDETERMINED, - SESSION_BUSY, // Another query is being executed in this session, should retry with a new session - PRECONDITION_FAILED -> { + // The result of the request is unknown: it might have never reached the DB, have been cancelled or timed out... or executed successfully! + case TIMEOUT, // Query timeout expired. If the query is conditionally retryable, retry it + UNDETERMINED, // YDB indicates undetermined transaction state. We don't know if it has been committed or not + CLIENT_CANCELLED, // GRPC call to the DB has been cancelled. We don't know if the DB has performed the request or not + TRANSPORT_UNAVAILABLE, // Network connectivity issues. We don't know if the server performed the request or not + CLIENT_DEADLINE_EXCEEDED, // YDB SDK could not get GRPC response from the DB in time + CLIENT_INTERNAL_ERROR -> { // Internal YDB SDK error, assumed to be transient + checkGrpcTimeoutAndCancellation(response, null); + log.warn(""" - Some database components are not available, but we still got a reply from the DB + Indeterminate request state: it's not known whether the request reached the DB and was performed Request: {} Response: {}""", request, response); - throw new YdbComponentUnavailableException(request, response); + throw new YdbConditionallyRetryableException( + "Indeterminate request state: it's not known whether the request reached the DB and was performed", + statusCode, + request, + response + ); } // GRPC client reports that the request was not authenticated properly. Retry immediately. @@ -124,13 +170,13 @@ public static void validate(String request, StatusCode statusCode, String respon // Serious internal error. No retries case CLIENT_CALL_UNIMPLEMENTED, + CLIENT_GRPC_ERROR, BAD_REQUEST, UNSUPPORTED, INTERNAL_ERROR, GENERIC_ERROR, UNUSED_STATUS, - // This status is used by other YDB services (not the {Table,Query}Service). This is *NOT* a form of PRECONDITION_FAILED! - ALREADY_EXISTS -> { + ALREADY_EXISTS -> { // Used by other YDB services (not the {Table,Query}Service). This is *NOT* a form of PRECONDITION_FAILED! log.error(""" Bad response status Request: {} @@ -149,8 +195,8 @@ public static void validate(String request, StatusCode statusCode, String respon } } - public static boolean isTransactionClosedByServer(StatusCode statusCode) { - return switch (statusCode) { + public static boolean isTransactionClosedByServer(Status status) { + return switch (status.getCode()) { case UNUSED_STATUS, ALREADY_EXISTS, BAD_REQUEST, @@ -186,37 +232,42 @@ public static boolean isTransactionClosedByServer(StatusCode statusCode) { }; } - public static void checkGrpcContextStatus(String errorMessage, @Nullable Throwable cause) { - if (Context.current().getDeadline() != null && Context.current().getDeadline().isExpired()) { + /** + * Checks whether the current GRPC request context has an expired deadline or has been cancelled; if so, then no retries will be performed + * for even a retryable condition, and a {@link DeadlineExceededException} or {@link QueryCancelledException} will be thrown immediately. + * + * @param errorMessage error message + * @param cause the underlying exception, if any + */ + public static void checkGrpcTimeoutAndCancellation(String errorMessage, @Nullable Throwable cause) { + Context ctx = Context.current(); + if (ctx.getDeadline() != null && ctx.getDeadline().isExpired()) { // GRPC deadline for the current GRPC context has expired. We need to throw a separate exception to avoid retries throw new DeadlineExceededException("DB query deadline exceeded. Response from DB: " + errorMessage, cause); - } else if (Context.current().isCancelled()) { + } else if (ctx.isCancelled()) { // Client has cancelled the GRPC request. Throw a separate exception to avoid retries throw new QueryCancelledException("DB query cancelled. Response from DB: " + errorMessage); } } - private static boolean is(Issue[] issues, Function function) { + private static boolean is(Issue[] issues, Predicate predicate) { for (Issue issue : issues) { - if (function.apply(issue) || (issue.getIssues().length > 0 && is(issue.getIssues(), function))) { + if (predicate.test(issue) || (issue.getIssues().length > 0 && is(issue.getIssues(), predicate))) { return true; } } return false; } - public static void validatePkConstraint(Issue[] issues) { - if (is(issues, IssueCode.CONSTRAINT_VIOLATION::matches)) { - StringBuilder error = new StringBuilder(); - is(issues, m -> { - if (!error.isEmpty()) { - error.append(":"); - } - error.append(m.getMessage()); - return false; - }); - throw new EntityAlreadyExistsException("Entity already exists: " + error); + private static String errorMessageFrom(Issue[] issues) { + StringBuilder sb = new StringBuilder(); + for (Issue issue : issues) { + if (!sb.isEmpty()) { + sb.append(':'); + } + sb.append(issue.getMessage()); } + return sb.toString(); } @RequiredArgsConstructor(access = PRIVATE) diff --git a/repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbRepositoryIntegrationTest.java b/repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbRepositoryIntegrationTest.java index da77ea7b..5bafc455 100644 --- a/repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbRepositoryIntegrationTest.java +++ b/repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbRepositoryIntegrationTest.java @@ -48,6 +48,7 @@ import tech.ydb.yoj.databind.schema.Column; import tech.ydb.yoj.databind.schema.GlobalIndex; import tech.ydb.yoj.databind.schema.ObjectSchema; +import tech.ydb.yoj.repository.db.ConditionalRetryMode; import tech.ydb.yoj.repository.db.Entity; import tech.ydb.yoj.repository.db.EntitySchema; import tech.ydb.yoj.repository.db.IsolationLevel; @@ -58,9 +59,13 @@ import tech.ydb.yoj.repository.db.StdTxManager; import tech.ydb.yoj.repository.db.TableDescriptor; import tech.ydb.yoj.repository.db.Tx; +import tech.ydb.yoj.repository.db.TxOptions; +import tech.ydb.yoj.repository.db.TxOptions.RetryOptions; import tech.ydb.yoj.repository.db.bulk.BulkParams; import tech.ydb.yoj.repository.db.common.CommonConverters; +import tech.ydb.yoj.repository.db.exception.ConditionallyRetryableException; import tech.ydb.yoj.repository.db.exception.ConversionException; +import tech.ydb.yoj.repository.db.exception.RepositoryException; import tech.ydb.yoj.repository.db.exception.RetryableException; import tech.ydb.yoj.repository.db.exception.UnavailableException; import tech.ydb.yoj.repository.db.list.ListRequest; @@ -70,7 +75,6 @@ import tech.ydb.yoj.repository.test.entity.TestEntities; import tech.ydb.yoj.repository.test.sample.TestDb; import tech.ydb.yoj.repository.test.sample.TestDbImpl; -import tech.ydb.yoj.repository.test.sample.model.Book; import tech.ydb.yoj.repository.test.sample.model.Bubble; import tech.ydb.yoj.repository.test.sample.model.ChangefeedEntity; import tech.ydb.yoj.repository.test.sample.model.IndexedEntity; @@ -106,6 +110,7 @@ import java.lang.reflect.Field; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; @@ -487,33 +492,95 @@ private static void ensureSameSessionId(SessionManager sessionManager, Session f } @Test - public void checkDBIsUnavailable() { - checkTxRetryableOnRequestError(StatusCodesProtos.StatusIds.StatusCode.UNAVAILABLE); - checkTxRetryableOnFlushingError(StatusCodesProtos.StatusIds.StatusCode.UNAVAILABLE); - checkTxNonRetryableOnCommit(StatusCodesProtos.StatusIds.StatusCode.UNAVAILABLE); + public void checkDbIsUnavailable() { + var possibleRetryOptions = Stream + .concat( + Stream.of(RetryOptions.DEFAULT), + // Use all ConditionalRetryMode values because UNAVAILABLE is UNconditionally retryable: + Arrays.stream(ConditionalRetryMode.values()).map(YdbRepositoryIntegrationTest::retryOptions) + ) + .toList(); + for (RetryOptions retryOptions : possibleRetryOptions) { + checkTxUnconditionallyRetryableOnRequest(retryOptions, StatusCodesProtos.StatusIds.StatusCode.UNAVAILABLE); + checkTxUnconditionallyRetryableOnFlushing(retryOptions, StatusCodesProtos.StatusIds.StatusCode.UNAVAILABLE); + checkTxUnconditionallyRetryableOnCommit(retryOptions, StatusCodesProtos.StatusIds.StatusCode.UNAVAILABLE); + } + } + + @Test + public void checkDbIsOverloaded() { + var possibleRetryOptions = Stream + .concat( + Stream.of(RetryOptions.DEFAULT), + // Use all ConditionalRetryMode values because OVERLOADED is UNconditionally retryable: + Arrays.stream(ConditionalRetryMode.values()).map(YdbRepositoryIntegrationTest::retryOptions) + ) + .toList(); + for (RetryOptions retryOptions : possibleRetryOptions) { + checkTxUnconditionallyRetryableOnRequest(retryOptions, StatusCodesProtos.StatusIds.StatusCode.OVERLOADED); + checkTxUnconditionallyRetryableOnFlushing(retryOptions, StatusCodesProtos.StatusIds.StatusCode.OVERLOADED); + checkTxUnconditionallyRetryableOnCommit(retryOptions, StatusCodesProtos.StatusIds.StatusCode.OVERLOADED); + } + } + + @Test + public void checkDbIsUndetermined() { + checkTxConditionallyRetryableOnRequest(RetryOptions.DEFAULT, StatusCodesProtos.StatusIds.StatusCode.UNDETERMINED); + checkTxConditionallyRetryableOnFlushing(RetryOptions.DEFAULT, StatusCodesProtos.StatusIds.StatusCode.UNDETERMINED); + checkTxUnavailableOnCommit(RetryOptions.DEFAULT, StatusCodesProtos.StatusIds.StatusCode.UNDETERMINED); + + checkTxConditionallyRetryableOnRequest(retryOptions(ConditionalRetryMode.ALWAYS), StatusCodesProtos.StatusIds.StatusCode.UNDETERMINED); + checkTxConditionallyRetryableOnFlushing(retryOptions(ConditionalRetryMode.ALWAYS), StatusCodesProtos.StatusIds.StatusCode.UNDETERMINED); + checkTxConditionallyRetryableOnCommit(retryOptions(ConditionalRetryMode.ALWAYS), StatusCodesProtos.StatusIds.StatusCode.UNDETERMINED); + + checkTxConditionallyRetryableOnRequest(retryOptions(ConditionalRetryMode.UNTIL_COMMIT), StatusCodesProtos.StatusIds.StatusCode.UNDETERMINED); + checkTxConditionallyRetryableOnFlushing(retryOptions(ConditionalRetryMode.UNTIL_COMMIT), StatusCodesProtos.StatusIds.StatusCode.UNDETERMINED); + checkTxUnavailableOnCommit(retryOptions(ConditionalRetryMode.UNTIL_COMMIT), StatusCodesProtos.StatusIds.StatusCode.UNDETERMINED); + + checkTxUnavailableOnRequest(retryOptions(ConditionalRetryMode.NEVER), StatusCodesProtos.StatusIds.StatusCode.UNDETERMINED); + checkTxUnavailableOnFlushing(retryOptions(ConditionalRetryMode.NEVER), StatusCodesProtos.StatusIds.StatusCode.UNDETERMINED); + checkTxUnavailableOnCommit(retryOptions(ConditionalRetryMode.NEVER), StatusCodesProtos.StatusIds.StatusCode.UNDETERMINED); } @Test - public void checkDBIsOverloaded() { - checkTxRetryableOnRequestError(StatusCodesProtos.StatusIds.StatusCode.OVERLOADED); - checkTxRetryableOnFlushingError(StatusCodesProtos.StatusIds.StatusCode.OVERLOADED); - checkTxNonRetryableOnCommit(StatusCodesProtos.StatusIds.StatusCode.OVERLOADED); + public void checkDbPreconditionFailed() { + var possibleRetryOptions = Stream + .concat( + Stream.of(RetryOptions.DEFAULT), + // Use all ConditionalRetryMode values because OVERLOADED is UNconditionally NON-retryable: + Arrays.stream(ConditionalRetryMode.values()).map(YdbRepositoryIntegrationTest::retryOptions) + ) + .toList(); + for (RetryOptions retryOptions : possibleRetryOptions) { + checkTxNonRetryableOnRequest(retryOptions, StatusCodesProtos.StatusIds.StatusCode.PRECONDITION_FAILED); + checkTxNonRetryableOnFlushing(retryOptions, StatusCodesProtos.StatusIds.StatusCode.PRECONDITION_FAILED); + checkTxNonRetryableOnCommit(retryOptions, StatusCodesProtos.StatusIds.StatusCode.PRECONDITION_FAILED); + } } @Test - public void checkDBSessionBusy() { - checkTxRetryableOnRequestError(StatusCodesProtos.StatusIds.StatusCode.PRECONDITION_FAILED); - checkTxRetryableOnFlushingError(StatusCodesProtos.StatusIds.StatusCode.PRECONDITION_FAILED); - checkTxNonRetryableOnCommit(StatusCodesProtos.StatusIds.StatusCode.PRECONDITION_FAILED); + public void checkDbSessionBusy() { + var possibleRetryOptions = Stream + .concat( + Stream.of(RetryOptions.DEFAULT), + // Use all ConditionalRetryMode values because SESSION_BUSY is UNconditionally retryable: + Arrays.stream(ConditionalRetryMode.values()).map(YdbRepositoryIntegrationTest::retryOptions) + ) + .toList(); + for (RetryOptions retryOptions : possibleRetryOptions) { + checkTxUnconditionallyRetryableOnRequest(retryOptions, StatusCodesProtos.StatusIds.StatusCode.SESSION_BUSY); + checkTxUnconditionallyRetryableOnFlushing(retryOptions, StatusCodesProtos.StatusIds.StatusCode.SESSION_BUSY); + checkTxUnconditionallyRetryableOnCommit(retryOptions, StatusCodesProtos.StatusIds.StatusCode.SESSION_BUSY); + } + } - checkTxRetryableOnRequestError(StatusCodesProtos.StatusIds.StatusCode.SESSION_BUSY); - checkTxRetryableOnFlushingError(StatusCodesProtos.StatusIds.StatusCode.SESSION_BUSY); - checkTxNonRetryableOnCommit(StatusCodesProtos.StatusIds.StatusCode.SESSION_BUSY); + private static RetryOptions retryOptions(ConditionalRetryMode crm) { + return RetryOptions.builder().conditionalRetryMode(crm).build(); } @Test public void subdirTable() { - Assertions.assertThat(((YdbRepository) repository).getSchemaOperations().getTableNames(true)) + assertThat(((YdbRepository) repository).getSchemaOperations().getTableNames(true)) .contains("subdir/SubdirEntity"); } @@ -935,15 +1002,68 @@ private void executeQuery(String expectSqlQuery, List expectRows, assertEquals(expectRows, actual); } - private void checkTxRetryableOnRequestError(StatusCodesProtos.StatusIds.StatusCode statusCode) { + private void checkTxNonRetryableOnRequest(RetryOptions retryOptions, StatusCodesProtos.StatusIds.StatusCode statusCode) { + checkTxOnRequest(retryOptions, statusCode, RepositoryException.class); + } + + private void checkTxUnavailableOnRequest(RetryOptions retryOptions, StatusCodesProtos.StatusIds.StatusCode statusCode) { + checkTxOnRequest(retryOptions, statusCode, UnavailableException.class); + } + + private void checkTxUnconditionallyRetryableOnRequest(RetryOptions retryOptions, StatusCodesProtos.StatusIds.StatusCode statusCode) { + checkTxOnRequest(retryOptions, statusCode, RetryableException.class); + } + + private void checkTxConditionallyRetryableOnRequest(RetryOptions retryOptions, StatusCodesProtos.StatusIds.StatusCode statusCode) { + checkTxOnRequest(retryOptions, statusCode, ConditionallyRetryableException.class); + } + + private void checkTxNonRetryableOnFlushing(RetryOptions retryOptions, StatusCodesProtos.StatusIds.StatusCode statusCode) { + checkTxErrorOnFlushing(retryOptions, statusCode, RepositoryException.class); + } + + private void checkTxUnavailableOnFlushing(RetryOptions retryOptions, StatusCodesProtos.StatusIds.StatusCode statusCode) { + checkTxErrorOnFlushing(retryOptions, statusCode, UnavailableException.class); + } + + private void checkTxUnconditionallyRetryableOnFlushing(RetryOptions retryOptions, StatusCodesProtos.StatusIds.StatusCode statusCode) { + checkTxErrorOnFlushing(retryOptions, statusCode, RetryableException.class); + } + + private void checkTxConditionallyRetryableOnFlushing(RetryOptions retryOptions, StatusCodesProtos.StatusIds.StatusCode statusCode) { + checkTxErrorOnFlushing(retryOptions, statusCode, ConditionallyRetryableException.class); + } + + private void checkTxNonRetryableOnCommit(RetryOptions retryOptions, StatusCodesProtos.StatusIds.StatusCode statusCode) { + checkTxErrorOnCommit(retryOptions, statusCode, RepositoryException.class); + } + + private void checkTxUnavailableOnCommit(RetryOptions retryOptions, StatusCodesProtos.StatusIds.StatusCode statusCode) { + checkTxErrorOnCommit(retryOptions, statusCode, UnavailableException.class); + } + + private void checkTxUnconditionallyRetryableOnCommit(RetryOptions retryOptions, StatusCodesProtos.StatusIds.StatusCode statusCode) { + checkTxErrorOnCommit(retryOptions, statusCode, RetryableException.class); + } + + private void checkTxConditionallyRetryableOnCommit(RetryOptions retryOptions, StatusCodesProtos.StatusIds.StatusCode statusCode) { + checkTxErrorOnCommit(retryOptions, statusCode, ConditionallyRetryableException.class); + } + + private void checkTxOnRequest(RetryOptions retryOptions, + StatusCodesProtos.StatusIds.StatusCode statusCode, + Class exceptionType) { YdbRepository proxiedRepository = new YdbRepository(getProxyServerConfig()); try { - RepositoryTransaction tx = proxiedRepository.startTransaction(); + RepositoryTransaction tx = proxiedRepository.startTransaction( + TxOptions.create(IsolationLevel.SERIALIZABLE_READ_WRITE) + .withRetryOptions(retryOptions) + ); runWithModifiedStatusCode( statusCode, () -> { - assertThatExceptionOfType(RetryableException.class) + assertThatExceptionOfType(exceptionType) .isThrownBy(tx.table(Project.class)::findAll); // This rollback is only a silent DB rollback, since the last transaction statement was exceptional. @@ -956,16 +1076,21 @@ private void checkTxRetryableOnRequestError(StatusCodesProtos.StatusIds.StatusCo } } - private void checkTxRetryableOnFlushingError(StatusCodesProtos.StatusIds.StatusCode statusCode) { + private void checkTxErrorOnFlushing(RetryOptions retryOptions, + StatusCodesProtos.StatusIds.StatusCode statusCode, + Class exceptionType) { YdbRepository proxiedRepository = new YdbRepository(getProxyServerConfig()); try { runWithModifiedStatusCode( statusCode, () -> { - RepositoryTransaction tx = proxiedRepository.startTransaction(); + RepositoryTransaction tx = proxiedRepository.startTransaction( + TxOptions.create(IsolationLevel.SERIALIZABLE_READ_WRITE) + .withRetryOptions(retryOptions) + ); tx.table(Project.class).save(new Project(new Project.Id("1"), "x")); - assertThatExceptionOfType(RetryableException.class) + assertThatExceptionOfType(exceptionType) .isThrownBy(tx::commit); } ); @@ -974,16 +1099,21 @@ private void checkTxRetryableOnFlushingError(StatusCodesProtos.StatusIds.StatusC } } - private void checkTxNonRetryableOnCommit(StatusCodesProtos.StatusIds.StatusCode statusCode) { + private void checkTxErrorOnCommit(RetryOptions retryOptions, + StatusCodesProtos.StatusIds.StatusCode statusCode, + Class exceptionType) { YdbRepository proxiedRepository = new YdbRepository(getProxyServerConfig()); try { - RepositoryTransaction tx = proxiedRepository.startTransaction(); + RepositoryTransaction tx = proxiedRepository.startTransaction( + TxOptions.create(IsolationLevel.SERIALIZABLE_READ_WRITE) + .withRetryOptions(retryOptions) + ); tx.table(Project.class).findAll(); runWithModifiedStatusCode( statusCode, - () -> assertThatExceptionOfType(UnavailableException.class) + () -> assertThatExceptionOfType(exceptionType) .isThrownBy(tx::commit) ); } finally { diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/ConditionalRetryMode.java b/repository/src/main/java/tech/ydb/yoj/repository/db/ConditionalRetryMode.java new file mode 100644 index 00000000..9be8c84e --- /dev/null +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/ConditionalRetryMode.java @@ -0,0 +1,25 @@ +package tech.ydb.yoj.repository.db; + +import tech.ydb.yoj.ExperimentalApi; + +/** + * Specified how to retry YOJ tx on a {@link tech.ydb.yoj.repository.db.exception.ConditionallyRetryableException conditionally-retryable} error. + *

The YOJ default is {@link #UNTIL_COMMIT}: the whole transaction body will be retried if a commit has not yet been attempted, or read-only + * or scan mode is used. + */ +@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/165") +public enum ConditionalRetryMode { + /** + * Never retry conditionally-retryable errors, even if the transaction commit has not yet been attempted. + */ + NEVER, + /** + * Retry the whole transaction body on a conditionally-retryable error, but only if transaction commit has not yet been attempted, + * or read-only or scan mode is used. + */ + UNTIL_COMMIT, + /** + * Retry the whole transaction body on a conditionally-retryable error, even if it occurred on a transaction commit. + */ + ALWAYS, +} diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/RepositoryTransaction.java b/repository/src/main/java/tech/ydb/yoj/repository/db/RepositoryTransaction.java index e0c9c145..627376ef 100644 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/RepositoryTransaction.java +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/RepositoryTransaction.java @@ -2,6 +2,8 @@ import tech.ydb.yoj.repository.db.cache.TransactionLocal; import tech.ydb.yoj.repository.db.exception.OptimisticLockException; +import tech.ydb.yoj.repository.db.exception.RepositoryException; +import tech.ydb.yoj.repository.db.exception.RetryableExceptionBase; /** * A DB transaction. Each instance must be closed with {@link #commit()} or {@link #rollback} methods @@ -17,8 +19,12 @@ public interface RepositoryTransaction { * transaction statement has thrown an exception, because it means that transaction didn't 'execute normally'. * * @throws OptimisticLockException if the transaction's optimistic attempt has failed and it ought to be started over + * @throws tech.ydb.yoj.repository.db.exception.RetryableExceptionBase on other retryable and conditionally-retryable DB errors + * @throws tech.ydb.yoj.repository.db.exception.RepositoryException on other non-retryable DB errors + * + * @see #wasCommitAttempted() */ - void commit() throws OptimisticLockException; + void commit() throws OptimisticLockException, RetryableExceptionBase, RepositoryException; /** * Rollbacks that transaction. This method must be called in the end unless {@link #commit()} method was chosen for calling. @@ -31,8 +37,16 @@ public interface RepositoryTransaction { *

* (Note, that consistency is only checked if the transaction has 'executed normally', i.e. the last statement didn't throw an exception. * Otherwise this method always completes normally.) + * + * @throws tech.ydb.yoj.repository.db.exception.RetryableExceptionBase on retryable and conditionally-retryable DB errors + * @throws tech.ydb.yoj.repository.db.exception.RepositoryException on non-retryable DB errors + */ + void rollback() throws RetryableExceptionBase, RepositoryException; + + /** + * {@code true} if an attempt to {@link #commit()} the transaction was made; {@code false} otherwise */ - void rollback() throws OptimisticLockException; + boolean wasCommitAttempted(); TransactionLocal getTransactionLocal(); diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/StdTxManager.java b/repository/src/main/java/tech/ydb/yoj/repository/db/StdTxManager.java index 1941c2e5..5270b109 100644 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/StdTxManager.java +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/StdTxManager.java @@ -14,8 +14,11 @@ import org.slf4j.LoggerFactory; import org.slf4j.MDC; import tech.ydb.yoj.DeprecationWarnings; +import tech.ydb.yoj.ExperimentalApi; import tech.ydb.yoj.repository.db.cache.TransactionLog; +import tech.ydb.yoj.repository.db.exception.ConditionallyRetryableException; import tech.ydb.yoj.repository.db.exception.RetryableException; +import tech.ydb.yoj.repository.db.exception.RetryableExceptionBase; import tech.ydb.yoj.util.lang.CallStack; import tech.ydb.yoj.util.lang.Strings; @@ -85,6 +88,9 @@ public final class StdTxManager implements TxManager, TxManagerState { private static final Counter retries = Counter.build("tx_retries", "Tx retry reasons") .labelNames("tx_name", "reason") .register(); + private static final Counter conditionalRetries = Counter.build("tx_conditional_retries", "Tx conditional retry reasons") + .labelNames("tx_name", "reason") + .register(); private static final AtomicLong txLogIdSeq = new AtomicLong(); @Getter @@ -195,6 +201,12 @@ public TxManager withLogStatementOnSuccess(boolean logStatementOnSuccess) { return withOptions(this.options.withLogStatementOnSuccess(logStatementOnSuccess)); } + @Override + @ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/165") + public TxManager withRetryOptions(@NonNull TxOptions.RetryOptions retryOptions) { + return withOptions(this.options.withRetryOptions(retryOptions)); + } + @Override public ReadonlyBuilder readOnly() { return new ReadonlyBuilderImpl(this.options.withIsolationLevel(ONLINE_CONSISTENT_READ_ONLY)); @@ -224,7 +236,7 @@ public T tx(Supplier supplier) { } private T txImpl(Supplier supplier) { - RetryableException lastRetryableException = null; + RetryableExceptionBase lastRetryableException = null; TxImpl lastTx = null; try (Timer ignored = totalDuration.labels(name).startTimer()) { for (int attempt = 1; attempt <= maxAttemptCount; attempt++) { @@ -249,6 +261,18 @@ private T txImpl(Supplier supplier) { if (attempt + 1 <= maxAttemptCount) { e.sleep(attempt); } + } catch (ConditionallyRetryableException e) { + boolean commitAttempted = lastTx != null && lastTx.getRepositoryTransaction().wasCommitAttempted(); + if (options.canConditionallyRetry(commitAttempted)) { + conditionalRetries.labels(name, getExceptionNameForMetric(e)).inc(); + lastRetryableException = e; + if (attempt + 1 <= maxAttemptCount) { + e.sleep(attempt); + } + } else { + results.labels(name, "rollback").inc(); + throw e.failImmediately(); + } } catch (Exception e) { results.labels(name, "rollback").inc(); throw e; @@ -272,15 +296,13 @@ private static void checkSeparatePolicy(SeparatePolicy separatePolicy, String tx switch (separatePolicy) { case ALLOW -> { } - case STRICT -> - throw new IllegalStateException(format("Transaction %s was run when another transaction is active", txName)); - case LOG -> - log.warn("Transaction '{}' was run when another transaction is active. Perhaps unexpected behavior. " + - "Use TxManager.separate() to avoid this message", txName); + case STRICT -> throw new IllegalStateException(format("Transaction %s was run when another transaction is active", txName)); + case LOG -> log.warn("Transaction '{}' was run when another transaction is active. Perhaps unexpected behavior. " + + "Use TxManager.separate() to avoid this message", txName); } } - private String getExceptionNameForMetric(RetryableException e) { + private static String getExceptionNameForMetric(Exception e) { return Strings.removeSuffix(e.getClass().getSimpleName(), "Exception"); } diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/TxManager.java b/repository/src/main/java/tech/ydb/yoj/repository/db/TxManager.java index dde78ba9..fed0cb45 100644 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/TxManager.java +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/TxManager.java @@ -2,6 +2,7 @@ import lombok.NonNull; import tech.ydb.yoj.ExperimentalApi; +import tech.ydb.yoj.repository.db.TxOptions.RetryOptions; import tech.ydb.yoj.repository.db.cache.TransactionLog; import tech.ydb.yoj.repository.db.exception.DeadlineExceededException; import tech.ydb.yoj.repository.db.exception.OptimisticLockException; @@ -194,6 +195,16 @@ default TxManager noQueryTracing() { @ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/162") TxManager withTracingFilter(@NonNull QueryTracingFilter tracingFilter); + /** + * Experimental API: Configures advanced retry options for this {@code TxManager}, e.g., retrying the transaction + * if it fails with a conditionally-retryable error on commit (and it's not known whether it was committed or not). + * + * @param retryOptions advanced retry options to use + * @return {@code TxManager} instance with retry options set + */ + @ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/165") + TxManager withRetryOptions(@NonNull RetryOptions retryOptions); + /** * Performs the specified action inside a transaction. The action must be idempotent, because it might be executed * multiple times in case of {@link OptimisticLockException transaction lock @@ -214,13 +225,16 @@ default TxManager noQueryTracing() { void tx(Runnable runnable); /** - * Start a transaction-like session of read-only statements. Each statement will be executed separately, - * with the specified isolation level (online consistent read-only, by default). - *

YDB doesn't currently support multi-statement read-only transactions. If you perform more than one read, be - * ready to handle potential inconsistencies between the reads. - *

You can also use {@code readOnly().run(() -> [table].readTable(...));} to efficiently read data from the table - * without interfering with OLTP transactions. In this case, data consistency is similar to snapshot isolation. If - * perform more than one {@code readTable()}, be ready to handle potential inconsistencies between the reads. + * Start a transaction-like session of read-only statements. + *

    + *
  • If you need a true multi-statement read-only transaction, use the {@link IsolationLevel#SNAPSHOT snapshot isolation} + * level by calling {@code readOnly().withStatementIsolationLevel(SNAPSHOT).run(your lambda...)}.
  • + *
  • On a non-{@code SNAPSHOT} read-only isolation level (which is the default!), you must be ready to handle + * potential inconsistencies between the reads, because every query is essentially in its own read-only transaction.
  • + *
  • You can also use {@code readOnly().run(() -> [table].readTable(...));} to efficiently read data from the table + * without interfering with OLTP transactions. In this case, data consistency is similar to snapshot isolation. + * If you perform more than one {@code readTable()} call, be ready to handle potential inconsistencies between the reads.
  • + *
*/ ReadonlyBuilder readOnly(); diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/TxOptions.java b/repository/src/main/java/tech/ydb/yoj/repository/db/TxOptions.java index 889b3597..5b8cc444 100644 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/TxOptions.java +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/TxOptions.java @@ -3,9 +3,11 @@ import lombok.AccessLevel; import lombok.Builder; import lombok.NonNull; +import lombok.RequiredArgsConstructor; import lombok.Value; import lombok.With; import tech.ydb.yoj.ExperimentalApi; +import tech.ydb.yoj.InternalApi; import tech.ydb.yoj.repository.db.cache.TransactionLog; import java.time.Duration; @@ -31,6 +33,10 @@ public class TxOptions { TimeoutOptions timeoutOptions; + @NonNull + @ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/165") + RetryOptions retryOptions; + boolean dryRun; boolean immediateWrites; @@ -53,6 +59,7 @@ public static TxOptions create(@NonNull IsolationLevel isolationLevel) { .firstLevelCache(true) .logLevel(TransactionLog.Level.DEBUG) .logStatementOnSuccess(true) + .retryOptions(RetryOptions.DEFAULT) .build(); } @@ -76,6 +83,17 @@ public boolean isScan() { return scanOptions != null; } + @InternalApi + @ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/165") + public boolean canConditionallyRetry(boolean commitAttempted) { + return switch (retryOptions.getConditionalRetryMode()) { + case ALWAYS -> true; + case NEVER -> false; + case UNTIL_COMMIT -> !commitAttempted // no commit() was attempted + || isReadOnly() || isScan(); // transaction is read-only (so commit() only frees up resources) + }; + } + public TimeoutOptions minTimeoutOptions(Duration timeoutFromExternalCtx) { if (timeoutFromExternalCtx == null && timeoutOptions == null) { return TimeoutOptions.DEFAULT; @@ -98,6 +116,7 @@ public TimeoutOptions minTimeoutOptions(Duration timeoutFromExternalCtx) { @Value @With + @Builder public static class TimeoutOptions { public static final TimeoutOptions DEFAULT = new TimeoutOptions(Duration.ofMinutes(5)); @@ -129,6 +148,7 @@ public Long getDeadlineAfter() { @Value @With + @Builder public static class ScanOptions { public static final ScanOptions DEFAULT = new ScanOptions(10_000, Duration.ofMinutes(5), false); @@ -136,4 +156,18 @@ public static class ScanOptions { Duration timeout; boolean useNewSpliterator; } + + @Value + @With + @Builder + @RequiredArgsConstructor(access = AccessLevel.PRIVATE) + @ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/165") + public static class RetryOptions { + public static final RetryOptions DEFAULT = new RetryOptions(ConditionalRetryMode.UNTIL_COMMIT); + + /** + * Retry mode for {@link tech.ydb.yoj.repository.db.exception.ConditionallyRetryableException conditionally-retryable} errors. + */ + ConditionalRetryMode conditionalRetryMode; + } } diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/exception/ConditionallyRetryableException.java b/repository/src/main/java/tech/ydb/yoj/repository/db/exception/ConditionallyRetryableException.java new file mode 100644 index 00000000..feabec51 --- /dev/null +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/exception/ConditionallyRetryableException.java @@ -0,0 +1,41 @@ +package tech.ydb.yoj.repository.db.exception; + +import tech.ydb.yoj.ExperimentalApi; +import tech.ydb.yoj.util.retry.RetryPolicy; + +/** + * Base class for conditionally-retryable database access exceptions. + * These correspond to indeterminate request states indicated by the database or by the underlying client/transport layers, when it's impossible + * to tell if the request has reached the database and has been executed: request timeouts, transport unavailability, client-level cancellation etc. + *

It's context-dependent whether the transaction can be retried safely after a {@code ConditionallyRetryableException}. + * By default, YOJ errs on the side of caution, and retries the transaction only if a commit has not been attempted, or if + * the transaction is in read-only or scan mode. + *

To customize retry behavior for conditionally-retryable exceptions, set the + * {@link tech.ydb.yoj.repository.db.TxManager#withRetryOptions(tech.ydb.yoj.repository.db.TxOptions.RetryOptions) TxManager's RetryOptions}. + * To ensure that retries are be safe, consider the broader context of your entire application. + *
E.g., if you're performing a {@code save()} (which is idempotent; saving the same entity multiple times does not change it), + * you have to additionally check that saving that particular entity with that specific state makes sense: check if the entity has already been saved, + * and if it has been, that saving this specific state of the entity makes sense, because another transaction might have saved a more relevant state. + * + * @see tech.ydb.yoj.repository.db.TxManager#withRetryOptions(tech.ydb.yoj.repository.db.TxOptions.RetryOptions) + * @see tech.ydb.yoj.repository.db.TxOptions.RetryOptions + */ +@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/165") +public abstract non-sealed class ConditionallyRetryableException extends RetryableExceptionBase { + protected ConditionallyRetryableException(String message, RetryPolicy retryPolicy, Throwable cause) { + super(message, retryPolicy, cause); + } + + protected ConditionallyRetryableException(String message, RetryPolicy retryPolicy) { + super(message, retryPolicy); + } + + @Override + public RepositoryException rethrow() { + return UnavailableException.afterRetries("Conditional retries failed", this); + } + + public RepositoryException failImmediately() { + return new UnavailableException("Conditional retries not attempted", this); + } +} diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/exception/EntityAlreadyExistsException.java b/repository/src/main/java/tech/ydb/yoj/repository/db/exception/EntityAlreadyExistsException.java index 21e4bd8d..64269938 100644 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/exception/EntityAlreadyExistsException.java +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/exception/EntityAlreadyExistsException.java @@ -1,8 +1,10 @@ package tech.ydb.yoj.repository.db.exception; +import tech.ydb.yoj.util.retry.RetryPolicy; + public class EntityAlreadyExistsException extends RetryableException { public EntityAlreadyExistsException(String message) { - super(message); + super(message, RetryPolicy.retryImmediately()); } @Override diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/exception/OptimisticLockException.java b/repository/src/main/java/tech/ydb/yoj/repository/db/exception/OptimisticLockException.java index 9bbfad55..23110337 100644 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/exception/OptimisticLockException.java +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/exception/OptimisticLockException.java @@ -1,11 +1,13 @@ package tech.ydb.yoj.repository.db.exception; +import tech.ydb.yoj.util.retry.RetryPolicy; + public class OptimisticLockException extends RetryableException { public OptimisticLockException(String message) { - super(message); + super(message, RetryPolicy.retryImmediately()); } public OptimisticLockException(String message, Throwable cause) { - super(message, cause); + super(message, RetryPolicy.retryImmediately(), cause); } } diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/exception/RetryableException.java b/repository/src/main/java/tech/ydb/yoj/repository/db/exception/RetryableException.java index 95ba195b..ade1f83c 100644 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/exception/RetryableException.java +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/exception/RetryableException.java @@ -2,46 +2,19 @@ import tech.ydb.yoj.util.retry.RetryPolicy; -import static java.util.concurrent.TimeUnit.MILLISECONDS; - /** - * Base class for retryable database access exceptions. + * Base class for unconditionally retryable database access exceptions. */ -public abstract class RetryableException extends RepositoryException { - private final RetryPolicy retryPolicy; - +public abstract non-sealed class RetryableException extends RetryableExceptionBase { protected RetryableException(String message, RetryPolicy retryPolicy, Throwable cause) { - super(message, cause); - this.retryPolicy = retryPolicy; + super(message, retryPolicy, cause); } protected RetryableException(String message, RetryPolicy retryPolicy) { - super(message); - this.retryPolicy = retryPolicy; - } - - protected RetryableException(String message, Throwable cause) { - this(message, RetryPolicy.retryImmediately(), cause); - } - - protected RetryableException(String message) { - this(message, RetryPolicy.retryImmediately()); - } - - /** - * Sleeps for the recommended amount of time before retrying. - * - * @param attempt request attempt count (starting from 1) - */ - public void sleep(int attempt) { - try { - MILLISECONDS.sleep(retryPolicy.calcDuration(attempt).toMillis()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new QueryInterruptedException("DB query interrupted", e); - } + super(message, retryPolicy); } + @Override public RepositoryException rethrow() { return UnavailableException.afterRetries("Retries failed", this); } diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/exception/RetryableExceptionBase.java b/repository/src/main/java/tech/ydb/yoj/repository/db/exception/RetryableExceptionBase.java new file mode 100644 index 00000000..650b78d8 --- /dev/null +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/exception/RetryableExceptionBase.java @@ -0,0 +1,46 @@ +package tech.ydb.yoj.repository.db.exception; + +import lombok.Getter; +import tech.ydb.yoj.ExperimentalApi; +import tech.ydb.yoj.util.retry.RetryPolicy; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +/** + * Common base class for both {@link RetryableException unconditionally retryable} and {@link ConditionallyRetryableException conditionally retryable} + * database exceptions. + */ +@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/165") +public abstract sealed class RetryableExceptionBase extends RepositoryException permits RetryableException, ConditionallyRetryableException { + @Getter + private final RetryPolicy retryPolicy; + + protected RetryableExceptionBase(String message, RetryPolicy retryPolicy, Throwable cause) { + super(message, cause); + this.retryPolicy = retryPolicy; + } + + protected RetryableExceptionBase(String message, RetryPolicy retryPolicy) { + super(message); + this.retryPolicy = retryPolicy; + } + + /** + * Sleeps for the recommended amount of time before retrying. + * + * @param attempt request attempt count (starting from 1) + */ + public final void sleep(int attempt) { + try { + MILLISECONDS.sleep(retryPolicy.calcDuration(attempt).toMillis()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new QueryInterruptedException("DB query interrupted", e); + } + } + + /** + * @return exception to throw if retries have failed; must not be {@code null} + */ + public abstract RepositoryException rethrow(); +}