Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.aspectj.lang.annotation.Aspect;
import tech.ydb.yoj.repository.db.Tx;
import tech.ydb.yoj.repository.db.TxManager;
import tech.ydb.yoj.repository.db.exception.ConditionallyRetryableException;
import tech.ydb.yoj.repository.db.exception.RetryableException;

/**
Expand Down Expand Up @@ -68,7 +69,7 @@ private Object doInTransaction(ProceedingJoinPoint pjp, YojTransactional transac

return localTx.tx(() -> safeCall(pjp));
}
} catch (CallRetryableException | CallException e) {
} catch (CallRetryableException | CallConditionallyRetryableException | CallException e) {
throw e.getCause();
}
}
Expand All @@ -88,17 +89,28 @@ Object safeCall(ProceedingJoinPoint pjp) {
return pjp.proceed();
} catch (RetryableException e) {
throw new CallRetryableException(e);
} catch (ConditionallyRetryableException e) {
throw new CallConditionallyRetryableException(e);
} catch (Throwable e) {
throw new CallException(e);
}
}

/**
* It's a hint for tx manager to retry was requested
* It's a hint for tx manager that an unconditional retry was requested
*/
static class CallRetryableException extends RetryableException {
CallRetryableException(RetryableException e) {
super(e.getMessage(), e.getCause());
super(e.getMessage(), e.getRetryPolicy(), e.getCause());
}
}

/**
* It's a hint for tx manager that a conditional retry was requested
*/
static class CallConditionallyRetryableException extends ConditionallyRetryableException {
CallConditionallyRetryableException(ConditionallyRetryableException e) {
super(e.getMessage(), e.getRetryPolicy(), e.getCause());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import java.util.function.Supplier;

public class InMemoryRepositoryTransaction implements BaseDb, RepositoryTransaction {
private final static AtomicLong txIdGenerator = new AtomicLong();
private static final String CLOSE_ACTION_COMMIT = "commit()";
private static final String CLOSE_ACTION_ROLLBACK = "rollback()";

private static final AtomicLong txIdGenerator = new AtomicLong();

private final long txId = txIdGenerator.incrementAndGet();
private final Stopwatch txStopwatch = Stopwatch.createStarted();
Expand Down Expand Up @@ -81,7 +84,12 @@ public void commit() {
if (isBadSession) {
throw new IllegalStateException("Transaction was invalidated. Commit isn't possible");
}
endTransaction("commit()", this::commitImpl);
endTransaction(CLOSE_ACTION_COMMIT, this::commitImpl);
}

@Override
public boolean wasCommitAttempted() {
return CLOSE_ACTION_COMMIT.equals(closeAction);
}

private void commitImpl() {
Expand All @@ -101,14 +109,15 @@ private void commitImpl() {

@Override
public void rollback() {
endTransaction("rollback()", this::rollbackImpl);
endTransaction(CLOSE_ACTION_ROLLBACK, this::rollbackImpl);
}

private void rollbackImpl() {
storage.rollback(txId);
}

private void endTransaction(String action, Runnable runnable) {
ensureTransactionActive();
try {
if (isFinalActionNeeded(action)) {
logTransaction(action, runnable);
Expand All @@ -134,6 +143,7 @@ private boolean isFinalActionNeeded(String action) {
final <T extends Entity<T>> void doInWriteTransaction(
String log, TableDescriptor<T> tableDescriptor, Consumer<WriteTxDataShard<T>> consumer
) {
ensureTransactionActive();
if (options.isScan()) {
throw new IllegalTransactionScanException("Mutable operations");
}
Expand All @@ -158,6 +168,7 @@ final <T extends Entity<T>> void doInWriteTransaction(
final <T extends Entity<T>, R> R doInTransaction(
String action, TableDescriptor<T> tableDescriptor, Function<ReadOnlyTxDataShard<T>, R> func
) {
ensureTransactionActive();
return logTransaction(action, () -> {
InMemoryTxLockWatcher findWatcher = hasWrites ? watcher : InMemoryTxLockWatcher.NO_LOCKS;
ReadOnlyTxDataShard<T> shard = storage.getReadOnlyTxDataShard(
Expand All @@ -180,10 +191,6 @@ private void logTransaction(String action, Runnable runnable) {
}

private <R> R logTransaction(String action, Supplier<R> supplier) {
if (closeAction != null) {
throw new IllegalStateException("Transaction already closed by " + closeAction);
}

Stopwatch sw = Stopwatch.createStarted();
try {
R result = supplier.get();
Expand All @@ -195,6 +202,12 @@ private <R> R logTransaction(String action, Supplier<R> supplier) {
}
}

private void ensureTransactionActive() {
if (closeAction != null) {
throw new IllegalStateException("Transaction already closed by " + closeAction);
}
}

private String printResult(Object result) {
if (result instanceof Iterable<?>) {
long size = Iterables.size((Iterable<?>) result);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package tech.ydb.yoj.repository.ydb.exception;

import tech.ydb.yoj.repository.db.exception.RetryableException;
import tech.ydb.yoj.util.retry.RetryPolicy;

/**
* Tried to use a no longer active or valid YDB session, e.g. on a node that is now down.
*/
public class BadSessionException extends RetryableException {
public BadSessionException(String message) {
super(message);
super(message, RetryPolicy.retryImmediately());
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package tech.ydb.yoj.repository.ydb.exception;

import lombok.Getter;
import tech.ydb.yoj.ExperimentalApi;
import tech.ydb.yoj.repository.db.exception.ConditionallyRetryableException;
import tech.ydb.yoj.util.lang.Strings;
import tech.ydb.yoj.util.retry.RetryPolicy;

/**
* Base class for <em>conditionally-retryable</em> exceptions from the YDB database, the YDB Java SDK, and the GRPC client used by the YDB Java SDK.
*
* @see ConditionallyRetryableException Conditionally-retryable Exceptions
*/
@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/165")
public class YdbConditionallyRetryableException extends ConditionallyRetryableException {
private static final RetryPolicy UNDETERMINED_BACKOFF = RetryPolicy.expBackoff(5L, 500L, 0.1, 2.0);

@Getter
private final Enum<?> statusCode;

public YdbConditionallyRetryableException(String message, Enum<?> statusCode, Object request, Object response) {
this(message, statusCode, request, response, UNDETERMINED_BACKOFF);
}

public YdbConditionallyRetryableException(String message, Enum<?> statusCode, Object request, Object response, RetryPolicy retryPolicy) {
super(Strings.join("\n", "[" + statusCode + "] " + message, request, response), retryPolicy);
this.statusCode = statusCode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
import tech.ydb.yoj.repository.db.exception.RetryableException;
import tech.ydb.yoj.repository.db.exception.UnavailableException;
import tech.ydb.yoj.util.lang.Strings;
import tech.ydb.yoj.util.retry.RetryPolicy;

/**
* YDB authentication failure, possibly a transient one. E.g., used a recently expired token.
*/
public class YdbUnauthenticatedException extends RetryableException {
public YdbUnauthenticatedException(Object request, Object response) {
super(Strings.join("\n", request, response));
super(Strings.join("\n", request, response), RetryPolicy.retryImmediately());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
import tech.ydb.yoj.repository.db.exception.RetryableException;
import tech.ydb.yoj.repository.db.exception.UnavailableException;
import tech.ydb.yoj.util.lang.Strings;
import tech.ydb.yoj.util.retry.RetryPolicy;

/**
* YDB authorization failure, possibly a transient one. E.g., the principal tried to write to the database but has no
* write-allowing role assigned.
*/
public class YdbUnauthorizedException extends RetryableException {
public YdbUnauthorizedException(Object request, Object response) {
super(Strings.join("\n", request, response));
super(Strings.join("\n", request, response), RetryPolicy.retryImmediately());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static tech.ydb.yoj.repository.ydb.client.YdbValidator.checkGrpcContextStatus;
import static tech.ydb.yoj.repository.ydb.client.YdbValidator.checkGrpcTimeoutAndCancellation;
import static tech.ydb.yoj.util.lang.Interrupts.isThreadInterrupted;

@InternalApi
Expand All @@ -40,7 +40,7 @@ private static RepositoryException convertToUnavailable(Throwable ex) {
Thread.currentThread().interrupt();
return new QueryInterruptedException("DB query interrupted", ex);
}
checkGrpcContextStatus(ex.getMessage(), ex);
checkGrpcTimeoutAndCancellation(ex.getMessage(), ex);

return new UnavailableException("DB is unavailable", ex);
}
Expand Down
Loading
Loading