diff --git a/repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java b/repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java index 3f552e8a..90960be6 100644 --- a/repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java +++ b/repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java @@ -380,7 +380,7 @@ private static > ReadTableParams defaultReadTablePar } private static > ReadTableParams.ReadTableParamsBuilder buildReadTableParamsNonLegacy() { - return ReadTableParams.builder().useNewSpliterator(true); + return ReadTableParams.builder().spliteratorType(ReadTableParams.SpliteratorType.EXPERIMENTAL); } @Test diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java index b3e86561..206aff3c 100644 --- a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java @@ -15,6 +15,7 @@ import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.StatusCode; +import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.proto.ValueProtos; import tech.ydb.table.Session; import tech.ydb.table.query.DataQueryResult; @@ -25,6 +26,7 @@ import tech.ydb.table.query.stats.QueryStats; import tech.ydb.table.query.stats.QueryStatsCollectionMode; import tech.ydb.table.query.stats.TableAccessStats; +import tech.ydb.table.query.ReadTablePart; import tech.ydb.table.result.ResultSetReader; import tech.ydb.table.settings.BulkUpsertSettings; import tech.ydb.table.settings.CommitTxSettings; @@ -70,6 +72,14 @@ import tech.ydb.yoj.repository.ydb.exception.YdbRepositoryException; import tech.ydb.yoj.repository.ydb.merge.QueriesMerger; import tech.ydb.yoj.repository.ydb.readtable.ReadTableMapper; +import tech.ydb.yoj.repository.ydb.spliterator.ClosableSpliterator; +import tech.ydb.yoj.repository.ydb.spliterator.ResultSetIterator; +import tech.ydb.yoj.repository.ydb.spliterator.YdbSpliterator; +import tech.ydb.yoj.repository.ydb.spliterator.queue.YojQueue; +import tech.ydb.yoj.repository.ydb.spliterator.YdbSpliteratorQueueGrpcStreamAdapter; +import tech.ydb.yoj.repository.ydb.spliterator.legacy.YdbLegacySpliterator; +import tech.ydb.yoj.repository.ydb.spliterator.legacy.YdbNewLegacySpliterator; +import tech.ydb.yoj.repository.ydb.spliterator.queue.YojQueueImpl; import tech.ydb.yoj.repository.ydb.statement.Statement; import tech.ydb.yoj.repository.ydb.table.YdbTable; import tech.ydb.yoj.util.lang.Interrupts; @@ -78,6 +88,7 @@ import javax.annotation.Nullable; import java.time.Duration; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -101,7 +112,7 @@ public class YdbRepositoryTransaction private static final String PROP_TRACE_VERBOSE_OBJ_RESULTS = "tech.ydb.yoj.repository.ydb.trace.verboseObjResults"; private final List> pendingWrites = new ArrayList<>(); - private final List> spliterators = new ArrayList<>(); + private final List> spliterators = new ArrayList<>(); @Getter private final TxOptions options; @@ -127,8 +138,8 @@ public YdbRepositoryTransaction(REPO repo, TxOptions options) { this.tablespace = repo.getSchemaOperations().getTablespace(); } - private YdbSpliterator createSpliterator(String request, boolean isOrdered) { - YdbSpliterator spliterator = new YdbSpliterator<>(request, isOrdered); + private YdbNewLegacySpliterator createSpliterator(String request, boolean isOrdered) { + YdbNewLegacySpliterator spliterator = new YdbNewLegacySpliterator<>(request, isOrdered); spliterators.add(spliterator); return spliterator; } @@ -183,7 +194,7 @@ private void doCommit() { private void closeStreams() { Exception summaryException = null; - for (YdbSpliterator spliterator : spliterators) { + for (ClosableSpliterator spliterator : spliterators) { try { spliterator.close(); } catch (Exception e) { @@ -451,7 +462,7 @@ public Stream executeScanQuery(Statement spliterator = createSpliterator("scanQuery: " + yql, false); + YdbNewLegacySpliterator spliterator = createSpliterator("scanQuery: " + yql, false); initSession(); session.executeScanQuery( @@ -559,38 +570,65 @@ public Stream readTable(ReadTableMapper settings.toKey(TupleValue.of(values), params.isToInclusive()); } - if (params.isUseNewSpliterator()) { - YdbSpliterator spliterator = createSpliterator("readTable: " + tableName, params.isOrdered()); + return switch (params.getSpliteratorType()) { + case LEGACY -> { + try { + YdbLegacySpliterator spliterator = new YdbLegacySpliterator<>(params.isOrdered(), action -> + doCall("read table " + mapper.getTableName(""), () -> { + Status status = YdbOperations.safeJoin( + session.readTable( + tableName, + settings.build(), + rs -> new ResultSetConverter(rs).stream(mapper::mapResult).forEach(action) + ), + params.getTimeout().plusMinutes(5) + ); + validate("readTable", status.getCode(), status.toString()); + }) + ); + yield spliterator.makeStream(); + } catch (RepositoryException e) { + throw e; + } catch (Exception e) { + throw new UnexpectedException("Could not read table " + tableName, e); + } + } + case LEGACY_SLOW -> { + YdbNewLegacySpliterator spliterator = createSpliterator("readTable: " + tableName, params.isOrdered()); - initSession(); - session.readTable( - tableName, settings.build(), - resultSet -> new ResultSetConverter(resultSet).stream(mapper::mapResult).forEach(spliterator::onNext) - ).whenComplete(spliterator::onSupplierThreadComplete); + initSession(); + session.readTable( + tableName, settings.build(), + resultSet -> new ResultSetConverter(resultSet).stream(mapper::mapResult).forEach(spliterator::onNext) + ).whenComplete(spliterator::onSupplierThreadComplete); - return spliterator.createStream(); - } + yield spliterator.createStream(); + } + case EXPERIMENTAL -> { + initSession(); + + // TODO: configure stream timeout + // TODO: configure batch count + YojQueue> queue = YojQueueImpl.create(0, Duration.ofMinutes(5)); + + var adapter = new YdbSpliteratorQueueGrpcStreamAdapter<>("readTable: " + tableName, queue); + GrpcReadStream grpcStream = session.executeReadTable(tableName, settings.build()); + CompletableFuture future = grpcStream.start(readTablePart -> { + ResultSetIterator iterator = new ResultSetIterator<>( + readTablePart.getResultSetReader(), + mapper::mapResult + ); + adapter.onNext(iterator); + }); + future.whenComplete(adapter::onSupplierThreadComplete); - try { - YdbLegacySpliterator spliterator = new YdbLegacySpliterator<>(params.isOrdered(), action -> - doCall("read table " + mapper.getTableName(""), () -> { - Status status = YdbOperations.safeJoin( - session.readTable( - tableName, - settings.build(), - rs -> new ResultSetConverter(rs).stream(mapper::mapResult).forEach(action) - ), - params.getTimeout().plusMinutes(5) - ); - validate("readTable", status.getCode(), status.toString()); - }) - ); - return spliterator.makeStream(); - } catch (RepositoryException e) { - throw e; - } catch (Exception e) { - throw new UnexpectedException("Could not read table " + tableName, e); - } + YdbSpliterator spliterator = new YdbSpliterator<>(queue, params.isOrdered()); + + spliterators.add(spliterator); + + yield spliterator.createStream(); + } + }; } /** diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/ClosableSpliterator.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/ClosableSpliterator.java new file mode 100644 index 00000000..8a6ddec9 --- /dev/null +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/ClosableSpliterator.java @@ -0,0 +1,7 @@ +package tech.ydb.yoj.repository.ydb.spliterator; + +import java.util.Spliterator; + +public interface ClosableSpliterator extends Spliterator { + void close(); +} diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/ResultSetIterator.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/ResultSetIterator.java new file mode 100644 index 00000000..ca9fa395 --- /dev/null +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/ResultSetIterator.java @@ -0,0 +1,73 @@ +package tech.ydb.yoj.repository.ydb.spliterator; + +import tech.ydb.proto.ValueProtos; +import tech.ydb.table.result.ResultSetReader; +import tech.ydb.yoj.repository.ydb.client.YdbConverter; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +public final class ResultSetIterator implements Iterator { + private final ResultSetReader resultSet; + private final ResultConverter converter; + private final List columns; + + private int position = 0; + + public ResultSetIterator(ResultSetReader resultSet, ResultConverter converter) { + List columns; + if (resultSet.getRowCount() > 0) { + resultSet.setRowIndex(0); + columns = getColumns(resultSet); + } else { + columns = new ArrayList<>(); + } + + this.resultSet = resultSet; + this.converter = converter; + this.columns = columns; + } + + @Override + public boolean hasNext() { + return position < resultSet.getRowCount(); + } + + @Override + public V next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + ValueProtos.Value value = buildValue(position++); + + return converter.convert(columns, value); + } + + private ValueProtos.Value buildValue(int rowIndex) { + resultSet.setRowIndex(rowIndex); + ValueProtos.Value.Builder value = ValueProtos.Value.newBuilder(); + for (int i = 0; i < columns.size(); i++) { + value.addItems(YdbConverter.convertValueToProto(resultSet.getColumn(i))); + } + return value.build(); + } + + private static List getColumns(ResultSetReader resultSet) { + List columns = new ArrayList<>(); + for (int i = 0; i < resultSet.getColumnCount(); i++) { + columns.add(ValueProtos.Column.newBuilder() + .setName(resultSet.getColumnName(i)) + .build() + ); + } + return columns; + } + + @FunctionalInterface + public interface ResultConverter { + V convert(List columns, ValueProtos.Value value); + } +} diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/YdbSpliterator.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/YdbSpliterator.java new file mode 100644 index 00000000..bfdc0eca --- /dev/null +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/YdbSpliterator.java @@ -0,0 +1,81 @@ +package tech.ydb.yoj.repository.ydb.spliterator; + +import tech.ydb.yoj.repository.ydb.spliterator.queue.YojQueue; +import tech.ydb.yoj.repository.ydb.spliterator.queue.YojSpliteratorQueue; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Spliterator; +import java.util.function.Consumer; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +public final class YdbSpliterator implements ClosableSpliterator { + private final YojSpliteratorQueue> queue; + private final int flags; + + private Iterator valueIterator = Collections.emptyIterator(); + + private boolean closed = false; + + public YdbSpliterator(YojQueue> queue, boolean isOrdered) { + this.queue = queue; + this.flags = (isOrdered ? ORDERED : 0) | NONNULL; + } + + // Correct way to create stream with YdbSpliterator. onClose call is important for avoid supplier thread leak. + public Stream createStream() { + return StreamSupport.stream(this, false).onClose(this::close); + } + + @Override + public void close() { + if (closed) { + return; + } + closed = true; + queue.close(); + } + + @Override + public boolean tryAdvance(Consumer action) { + if (closed) { + return false; + } + + // queue could return empty iterator, we have to select one with elements + while (!valueIterator.hasNext()) { + valueIterator = queue.poll(); + if (valueIterator == null) { + close(); + return false; + } + } + + V value = valueIterator.next(); + + action.accept(value); + + return true; + } + + @Override + public Spliterator trySplit() { + return null; + } + + @Override + public long estimateSize() { + return Long.MAX_VALUE; + } + + @Override + public long getExactSizeIfKnown() { + return -1; + } + + @Override + public int characteristics() { + return flags; + } +} diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/YdbSpliteratorQueueGrpcStreamAdapter.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/YdbSpliteratorQueueGrpcStreamAdapter.java new file mode 100644 index 00000000..c51dd3d4 --- /dev/null +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/YdbSpliteratorQueueGrpcStreamAdapter.java @@ -0,0 +1,63 @@ +package tech.ydb.yoj.repository.ydb.spliterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import tech.ydb.core.Status; +import tech.ydb.yoj.repository.ydb.YdbOperations; +import tech.ydb.yoj.repository.ydb.spliterator.queue.OfferDeadlineExceededException; +import tech.ydb.yoj.repository.ydb.spliterator.queue.YojQueue; +import tech.ydb.yoj.repository.ydb.spliterator.queue.YojSupplierQueue; + +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; + +import static tech.ydb.yoj.repository.ydb.client.YdbValidator.validate; + +public final class YdbSpliteratorQueueGrpcStreamAdapter { + private static final Logger log = LoggerFactory.getLogger(YdbSpliteratorQueueGrpcStreamAdapter.class); + + private final String request; + private final YojSupplierQueue queue; + + public YdbSpliteratorQueueGrpcStreamAdapter(String request, YojQueue queue) { + this.request = request; + this.queue = queue; + } + + public void onNext(V values) { + if (!queue.offer(values)) { + // Need to abort supplier thread if stream is closed. onSupplierThreadComplete will exit immediately. + // ConsumerDoneException isn't handled because onSupplierThreadComplete will exit by this.closed. + throw ConsumerDoneException.INSTANCE; + } + } + + // (supplier thread) Send knowledge to stream when data is over. + public void onSupplierThreadComplete(Status status, Throwable ex) { + var error = unwrapException(ex); + if (queue.isClosed() || error instanceof OfferDeadlineExceededException) { + log.error("Supplier thread was closed because consumer didn't poll an element of stream on timeout"); + // If deadline exceeded happen, need to do nothing. Stream thread will exit at deadline by themself. + return; + } + + queue.supplierDone(() -> { + if (error != null) { + throw YdbOperations.convertToRepositoryException(error); + } + + validate(request, status.getCode(), status.toString()); + }); + } + + private static Throwable unwrapException(Throwable ex) { + if (ex instanceof CompletionException || ex instanceof ExecutionException) { + return ex.getCause(); + } + return ex; + } + + private static class ConsumerDoneException extends RuntimeException { + public final static ConsumerDoneException INSTANCE = new ConsumerDoneException(); + } +} diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbLegacySpliterator.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/legacy/YdbLegacySpliterator.java similarity index 96% rename from repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbLegacySpliterator.java rename to repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/legacy/YdbLegacySpliterator.java index 817700a1..48f50bb1 100644 --- a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbLegacySpliterator.java +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/legacy/YdbLegacySpliterator.java @@ -1,4 +1,4 @@ -package tech.ydb.yoj.repository.ydb; +package tech.ydb.yoj.repository.ydb.spliterator.legacy; import tech.ydb.yoj.InternalApi; diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbSpliterator.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/legacy/YdbNewLegacySpliterator.java similarity index 95% rename from repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbSpliterator.java rename to repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/legacy/YdbNewLegacySpliterator.java index 6b6903e1..39e9667a 100644 --- a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbSpliterator.java +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/legacy/YdbNewLegacySpliterator.java @@ -1,4 +1,4 @@ -package tech.ydb.yoj.repository.ydb; +package tech.ydb.yoj.repository.ydb.spliterator.legacy; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; @@ -8,6 +8,8 @@ import tech.ydb.yoj.InternalApi; import tech.ydb.yoj.repository.db.exception.DeadlineExceededException; import tech.ydb.yoj.repository.db.exception.QueryInterruptedException; +import tech.ydb.yoj.repository.ydb.YdbOperations; +import tech.ydb.yoj.repository.ydb.spliterator.ClosableSpliterator; import javax.annotation.Nullable; import java.time.Duration; @@ -36,9 +38,10 @@ * GitHub Issue #42. */ @InternalApi +@Deprecated // too slow @ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/42") -public class YdbSpliterator implements Spliterator { - private static final Logger log = LoggerFactory.getLogger(YdbSpliterator.class); +public class YdbNewLegacySpliterator implements ClosableSpliterator { + private static final Logger log = LoggerFactory.getLogger(YdbNewLegacySpliterator.class); private static final Duration DEFAULT_STREAM_WORK_TIMEOUT = Duration.ofMinutes(5); @@ -57,12 +60,12 @@ public class YdbSpliterator implements Spliterator { private boolean endData = false; - public YdbSpliterator(String request, boolean isOrdered) { + public YdbNewLegacySpliterator(String request, boolean isOrdered) { this(request, isOrdered, DEFAULT_STREAM_WORK_TIMEOUT); } @VisibleForTesting - protected YdbSpliterator(String request, boolean isOrdered, Duration streamWorkTimeout) { + protected YdbNewLegacySpliterator(String request, boolean isOrdered, Duration streamWorkTimeout) { this.flags = (isOrdered ? ORDERED : 0) | NONNULL; this.streamWorkDeadlineNanos = System.nanoTime() + TimeUnit.NANOSECONDS.toNanos(saturatedToNanos(streamWorkTimeout)); this.validateResponse = (status, error) -> { @@ -158,6 +161,7 @@ public boolean tryAdvance(Consumer action) { } // (stream thread) close spliterator and abort supplier thread + @Override public void close() { // close() can be called twice by stream.close() and in the end of transaction if (closed) { diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/queue/OfferDeadlineExceededException.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/queue/OfferDeadlineExceededException.java new file mode 100644 index 00000000..a138f5d7 --- /dev/null +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/queue/OfferDeadlineExceededException.java @@ -0,0 +1,4 @@ +package tech.ydb.yoj.repository.ydb.spliterator.queue; + +public final class OfferDeadlineExceededException extends RuntimeException { +} diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/queue/YojQueue.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/queue/YojQueue.java new file mode 100644 index 00000000..9b4f63c9 --- /dev/null +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/queue/YojQueue.java @@ -0,0 +1,4 @@ +package tech.ydb.yoj.repository.ydb.spliterator.queue; + +public interface YojQueue extends YojSpliteratorQueue, YojSupplierQueue { +} diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/queue/YojQueueImpl.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/queue/YojQueueImpl.java new file mode 100644 index 00000000..8b01ca0f --- /dev/null +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/queue/YojQueueImpl.java @@ -0,0 +1,135 @@ +package tech.ydb.yoj.repository.ydb.spliterator.queue; + +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import tech.ydb.yoj.repository.db.exception.DeadlineExceededException; +import tech.ydb.yoj.repository.db.exception.QueryInterruptedException; + +import javax.annotation.Nullable; +import java.time.Duration; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; + +public final class YojQueueImpl implements YojQueue { + private final BlockingQueue> queue; + private final long streamWorkDeadlineNanos; + + private volatile boolean closed = false; + + public static YojQueueImpl create(int maxQueueSize, Duration streamWorkTimeout) { + return new YojQueueImpl<>( + createQueue(maxQueueSize), + calculateDeadline(streamWorkTimeout) + ); + } + + private YojQueueImpl(BlockingQueue> queue, long streamWorkDeadlineNanos) { + this.queue = queue; + this.streamWorkDeadlineNanos = streamWorkDeadlineNanos; + } + + @Override + public boolean isClosed() { + return closed; + } + + // (grpc thread) Send values to user-stream. + @Override + public boolean offer(V value) { + return offerValueSupplier(() -> value); + } + + // (grpc thread) Send knowledge to user-stream when data is over (or error handled). + @Override + public void supplierDone(Runnable status) { + offerValueSupplier(() -> { + status.run(); + return null; + }); + } + + // (user thread) Get values from grpc-stream. Could be called only from one thread because of volatile closed variable + @Nullable + @Override + public V poll() { + if (closed) { + return null; + } + + Supplier valueSupplier = pollValueSupplier(); + if (valueSupplier == null) { + throw new DeadlineExceededException("Stream deadline exceeded on poll"); + } + + V value = valueSupplier.get(); + if (value == null) { + close(); + } + + return value; + } + + // (user thread) Could be called only from one thread with poll() because of volatile closed variable + @Override + public void close() { + if (closed) { + return; + } + closed = true; + queue.clear(); + } + + private boolean offerValueSupplier(Supplier valueSupplier) throws OfferDeadlineExceededException { + if (closed) { + return false; + } + + try { + if (!queue.offer(valueSupplier, calculateTimeout(), TimeUnit.NANOSECONDS)) { + throw new OfferDeadlineExceededException(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new QueryInterruptedException("Supplier thread interrupted", e); + } + + return !closed; + } + + @Nullable + private Supplier pollValueSupplier() { + try { + return queue.poll(calculateTimeout(), TimeUnit.NANOSECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new QueryInterruptedException("Consumer thread interrupted", e); + } + } + + private long calculateTimeout() { + return TimeUnit.NANOSECONDS.toNanos(streamWorkDeadlineNanos - System.nanoTime()); + } + + private static BlockingQueue> createQueue(int maxQueueSize) { + Preconditions.checkArgument(maxQueueSize >= 0, "maxQueueSize must be greater than 0"); + if (maxQueueSize == 0) { + return new SynchronousQueue<>(); + } + return new ArrayBlockingQueue<>(maxQueueSize); + } + + private static long calculateDeadline(Duration streamWorkTimeout) { + return System.nanoTime() + TimeUnit.NANOSECONDS.toNanos(saturatedToNanos(streamWorkTimeout)); + } + + // copy-paste from com.google.common.util.concurrent.Uninterruptibles + private static long saturatedToNanos(Duration duration) { + try { + return duration.toNanos(); + } catch (ArithmeticException ignore) { + return duration.isNegative() ? -9223372036854775808L : 9223372036854775807L; + } + } +} diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/queue/YojSpliteratorQueue.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/queue/YojSpliteratorQueue.java new file mode 100644 index 00000000..2d548a48 --- /dev/null +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/queue/YojSpliteratorQueue.java @@ -0,0 +1,12 @@ +package tech.ydb.yoj.repository.ydb.spliterator.queue; + +import javax.annotation.Nullable; + +public interface YojSpliteratorQueue { + // (user thread) Get values from grpc-stream. Could be called only from one thread because of volatile closed variable + @Nullable + V poll(); + + // (user thread) Could be called only from one thread with poll() because of volatile closed variable + void close(); +} diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/queue/YojSupplierQueue.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/queue/YojSupplierQueue.java new file mode 100644 index 00000000..40beda1b --- /dev/null +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/queue/YojSupplierQueue.java @@ -0,0 +1,11 @@ +package tech.ydb.yoj.repository.ydb.spliterator.queue; + +public interface YojSupplierQueue { + // (grpc thread) Send values to user-stream. + boolean offer(V value) throws OfferDeadlineExceededException; + + // (grpc thread) Send knowledge to user-stream when data is over (or error handled). + void supplierDone(Runnable status); + + boolean isClosed(); +} diff --git a/repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbRepositoryIntegrationTest.java b/repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbRepositoryIntegrationTest.java index da77ea7b..90338f75 100644 --- a/repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbRepositoryIntegrationTest.java +++ b/repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbRepositoryIntegrationTest.java @@ -202,7 +202,9 @@ public void useClosedReadTableStream() { }); - ReadTableParams params = ReadTableParams.builder().useNewSpliterator(true).build(); + ReadTableParams params = ReadTableParams.builder() + .spliteratorType(ReadTableParams.SpliteratorType.LEGACY_SLOW) + .build(); Stream readOnlyStream = db.readOnly().run(() -> db.projects().readTable(params)); assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> diff --git a/repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbSpliteratorTest.java b/repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/spliterator/legacy/YdbNewLegacySpliteratorTest.java similarity index 92% rename from repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbSpliteratorTest.java rename to repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/spliterator/legacy/YdbNewLegacySpliteratorTest.java index 97f1b3fd..ae9a1bd5 100644 --- a/repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbSpliteratorTest.java +++ b/repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/spliterator/legacy/YdbNewLegacySpliteratorTest.java @@ -1,4 +1,4 @@ -package tech.ydb.yoj.repository.ydb; +package tech.ydb.yoj.repository.ydb.spliterator.legacy; import com.google.common.util.concurrent.Runnables; import com.google.common.util.concurrent.Uninterruptibles; @@ -19,7 +19,7 @@ import static org.assertj.core.api.Assertions.assertThatExceptionOfType; -public class YdbSpliteratorTest { +public class YdbNewLegacySpliteratorTest { @SneakyThrows public static void doAfter(int millis, Runnable runnable) { Thread.sleep(millis); @@ -88,7 +88,7 @@ private static class ReadTableMock { private final AtomicInteger selectedValuesCount = new AtomicInteger(); - private final YdbSpliterator spliterator; + private final YdbNewLegacySpliterator spliterator; private final List bucketSizes = new ArrayList<>(); @@ -96,7 +96,7 @@ private static class ReadTableMock { private Status status = Status.SUCCESS; private Throwable exception = null; - private ReadTableMock(YdbSpliterator spliterator) { + private ReadTableMock(YdbNewLegacySpliterator spliterator) { this.spliterator = spliterator; } @@ -105,7 +105,7 @@ public static ReadTableMock start() { } public static ReadTableMock start(Duration timeout) { - YdbSpliterator spliterator = new YdbSpliterator<>("stream", false, timeout); + YdbNewLegacySpliterator spliterator = new YdbNewLegacySpliterator<>("stream", false, timeout); ReadTableMock mock = new ReadTableMock(spliterator); mock.run(); @@ -200,7 +200,7 @@ public void closeSupplierThreadWhenCloseOfLimitedStreamWasForgotten() { @Test @SneakyThrows public void endStreamWhenSupplerOfferValue() { - YdbSpliterator spliterator = new YdbSpliterator<>("stream", false, Duration.ofMillis(500)); + YdbNewLegacySpliterator spliterator = new YdbNewLegacySpliterator<>("stream", false, Duration.ofMillis(500)); spliterator.onNext(1); @@ -209,7 +209,7 @@ public void endStreamWhenSupplerOfferValue() { thread.start(); spliterator.onNext(2); - assertThatExceptionOfType(YdbSpliterator.ConsumerDoneException.class).isThrownBy(() -> + assertThatExceptionOfType(YdbNewLegacySpliterator.ConsumerDoneException.class).isThrownBy(() -> spliterator.onNext(3) ); diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/readtable/ReadTableParams.java b/repository/src/main/java/tech/ydb/yoj/repository/db/readtable/ReadTableParams.java index f1cb203d..b9eaf5ac 100644 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/readtable/ReadTableParams.java +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/readtable/ReadTableParams.java @@ -17,15 +17,8 @@ public class ReadTableParams { int rowLimit; @Builder.Default Duration timeout = Duration.ofSeconds(60); - - /** - * Set this to {@code true} to use a {@code Spliterator} contract-conformant and less memory consuming implementation for the {@code Stream} - * returned by {@code readTable()}. - *

Note that using the new implementation currently has a negative performance impact, for more information refer to - * GitHub Issue #42. - */ - @ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/42") - boolean useNewSpliterator; + @Builder.Default + SpliteratorType spliteratorType = SpliteratorType.LEGACY; int batchLimitBytes; int batchLimitRows; @@ -48,4 +41,16 @@ public ReadTableParams.ReadTableParamsBuilder toKeyInclusive(ID toKey) { return toKey(toKey).toInclusive(true); } } + + public enum SpliteratorType { + LEGACY, + /** + * Set this to {@code true} to use a {@code Spliterator} contract-conformant and less memory consuming implementation for the {@code Stream} + * returned by {@code readTable()}. + *

Note that using the new implementation currently has a negative performance impact, for more information refer to + * GitHub Issue #42. + */ + LEGACY_SLOW, + EXPERIMENTAL, + } }