From 96366db6c63ca751137a61ee760dae072adb25af Mon Sep 17 00:00:00 2001 From: Sylvain Lebresne Date: Wed, 24 Sep 2025 15:46:19 +0200 Subject: [PATCH] CNDB-15483: CNDB-15300: Add `SSTableReader#getApproximatePositionsForRanges` (#1993) This PR is in the context of https://github.com/riptano/cndb/pull/15380, and is used by its PR https://github.com/riptano/cndb/pull/15380. It adds a variant of the `SSTableReader#getPositionsForRanges` method that never read the data file to return its results, but in exchange may return positions that slightly "overshoot" the requested range. Put another way, the added method `SSTableReader#getApproximatePositionsForRanges` is such that if you call it on some range `R`, and you read the data within the returned positions, then the read data may start by one (at most) key (partition really) that sorts strictly before `R`, and may end by one (at most) key that sorts strictly after `R`. Additionally, the PR switches the reading of the `Statistics.db` component from using `RandomAccessReader` to using `FileInputStreamPlus`. This is essentially equivalent functionality wise (since the component is deserialized sequentially anyway, there is no random reads), but by making it more "clear" that it doesn't do random reads, it allows us to "direct download" this component like other related components on the CNDB side. See the last point of https://github.com/riptano/cndb/pull/15380 for more details. --- .../io/sstable/format/SSTableReader.java | 45 +++++ .../io/sstable/format/bti/BtiTableReader.java | 71 ++++++++ .../sstable/metadata/MetadataSerializer.java | 13 +- .../io/sstable/SSTableReaderTest.java | 167 ++++++++++++++++++ .../metadata/MetadataSerializerTest.java | 17 +- 5 files changed, 298 insertions(+), 15 deletions(-) diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index f240d9a70a14..65d4a0a1df06 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -846,6 +846,45 @@ public List getPositionsForRanges(Collection + * By allowing some imprecision, this method may be faster/avoid reads to the data file that are otherwise + * necessary. In practice, the positions returned by this method may be up to "one key off" for each bound of + * each range. In other words, say we pass range `(t_s, t_e]`, and let's denote by `k_i` the ith key in the + * underlying sstable. And suppose that `getPositionsForRanges([(t_s, t_e]])` returns `(k_s, k_e)` (with `s` < `e`), + * where `k_i` means the position to i-th key in the sstable. Then this method applied to this same range may return + * either one of `(k_s, k_e)` (same result), `(k_s-1, k_e)`, `(k_s, k_e+1)`, or `(k_s-1, k_e+1)` + *

+ * Also note that as a consequence of this, the returned list of position bounds may have some strict overlap + * (the method could return something along the lines of `[(0, 100), (80, 200)]`). But all the starting positions + * and all the ending positions will still be ordered. + */ + public List getApproximatePositionsForRanges(Collection> ranges) + { + List positions = new ArrayList<>(); + for (Range range : Range.normalize(ranges)) + { + assert !range.isWrapAround() || range.right.isMinimum(); + AbstractBounds bounds = Range.makeRowRange(range); + PartitionPositionBounds pb = getApproximatePositionsForBounds(bounds); + if (pb != null) + positions.add(pb); + } + return positions; + } + + /** + * This is to {@link #getPositionsForBounds(AbstractBounds)} what {@link #getApproximatePositionsForRanges(Collection)} + * is to {@link #getPositionsForRanges(Collection)}. + */ + public PartitionPositionBounds getApproximatePositionsForBounds(AbstractBounds bounds) + { + // Return the exact positions by default; this can be overridden by concrete sstable implementations. + return getPositionsForBounds(bounds); + } + /** * Get a list of data positions in this SSTable that correspond to the given list of bounds. This method will remove * non-covered intervals, but will not correct order or overlap in the supplied list, e.g. if bounds overlap, the @@ -2032,6 +2071,12 @@ public final boolean equals(Object o) PartitionPositionBounds that = (PartitionPositionBounds) o; return lowerPosition == that.lowerPosition && upperPosition == that.upperPosition; } + + @Override + public String toString() + { + return String.format("(%d, %d)", lowerPosition, upperPosition); + } } public static class IndexesBounds diff --git a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReader.java index dd439c13bb3b..86df9fa6375c 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReader.java @@ -169,6 +169,77 @@ protected TrieIndexEntry getRowIndexEntry(PartitionPosition key, throw new IllegalArgumentException("Invalid op: " + operator); } + private TrieIndexEntry getApproximatePosition(PartitionPosition key, Operator op, boolean isLeftBound) + { + assert op == GT || op == GE; + // We currently only need this method in contexts where neither early opening nor zero copy transfer are used, + // which means we don't have to worry about `filterFirst`/`filterLast`. We could expand that method to support + // those, but it's unclear it will ever be needed, and this would require proper testing, so leaving aside for now. + assert openReason != OpenReason.MOVED_START : "Early opening is not supported with this method"; + assert !sstableMetadata.zeroCopyMetadata.exists() : "SSTables with zero copy metadata are not supported"; + + try (PartitionIndex.Reader reader = partitionIndex.openReader()) + { + return reader.ceiling(key, (pos, assumeNoMatch, compareKey) -> { + // The goal of the overall method, compared to `getPosition`, is to avoid reading the data file. If + // whatever partition we look at has a row index (`pos >= 0`), then `retrieveEntryIfAcceptable` may + // read the row index file, but it will never read the data file, so we can use it like in `getPosition`. + if (pos >= 0) + return retrieveEntryIfAcceptable(op, compareKey, pos, assumeNoMatch); + + // If `assumeNoMatch == false`, then it means we've matched a prefix of the searched key. This means + // `pos` points to a key `K` in the sstable that is "the closest on" to `searchKey`, but it may be + // before, equal or after `searchKey`. In `getPosition`, `retrieveEntryIfAcceptable` reads the + // actual key in data file to decide which case we're on, and base on that whether we want that entry, + // or the one after that (in order). But here, we explicitly want to avoid that read to the data file, + // so: + // - if it is a left bound, we eagerly accept a prefix entry. If `K > searchKey`, then that's the + // "best" answer anyway. If `K < searchKey`, then returning the "next" key would have been "best", + // but returning `K` is only "one off" and still covers `searchKey`, so it is acceptable. + // - if it is a right bound, then we do not accept a prefix. Whatever `K` is, we will only accept the + // "next" key (`assumeNoMatch` will then be `true`). If it happened that `K < searchKey`, then + // we were right to not return `K` and the "next" key is the best choice. If `K > searchKey`, + // then we're again "one off" compared to the best option, but as we cover `searchKey`, it is + // acceptable. + // We didn't mention `K = searchKey` above because whether it falls in the camp of `>` or `<` in the + // cases above depend on whether `searchOp` is GT or GE, but the overall resonable extend there + // otherwise. + return isLeftBound || assumeNoMatch ? new TrieIndexEntry(~pos) : null; + }); + } + catch (IOException e) + { + markSuspect(); + throw new CorruptSSTableException(e, rowIndexFile.path()); + } + } + + @Override + public PartitionPositionBounds getApproximatePositionsForBounds(AbstractBounds bounds) + { + TrieIndexEntry rieLeft = getApproximatePosition(bounds.left, bounds.inclusiveLeft() ? Operator.GE : Operator.GT, true); + if (rieLeft == null) // empty range + return null; + long left = rieLeft.position; + + TrieIndexEntry rieRight = bounds.right.isMinimum() + ? null + : getApproximatePosition(bounds.right, bounds.inclusiveRight() ? Operator.GT : Operator.GE, false); + long right; + if (rieRight != null) + right = rieRight.position; + else // right is beyond end + right = uncompressedLength(); // this should also be correct for EARLY readers + + if (left >= right) + { + // empty range + return null; + } + + return new PartitionPositionBounds(left, right); + } + /** * Called by {@link #getRowIndexEntry} above (via Reader.ceiling/floor) to check if the position satisfies the full * key constraint. This is called once if there is a prefix match (which can be in any relationship with the sought diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java index 3b7162a58877..e01055c8d23c 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java @@ -28,6 +28,7 @@ import java.util.zip.CRC32; import com.google.common.base.Throwables; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,13 +41,13 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat.Components; import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.io.util.File; -import org.apache.cassandra.io.util.FileDataInput; -import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.utils.TimeUUID; +import org.apache.cassandra.io.util.FileInputStreamPlus; import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt; @@ -156,7 +157,7 @@ public Map deserialize(Descriptor descriptor, E } else { - try (RandomAccessReader r = RandomAccessReader.open(statsFile)) + try (FileInputStreamPlus r = new FileInputStreamPlus(statsFile)) { components = deserialize(descriptor, r, types); } @@ -170,7 +171,7 @@ public MetadataComponent deserialize(Descriptor descriptor, MetadataType type) t } public Map deserialize(Descriptor descriptor, - FileDataInput in, + FileInputStreamPlus in, EnumSet selectedTypes) throws IOException { @@ -181,7 +182,7 @@ public Map deserialize(Descriptor descriptor, * Read TOC */ - int length = (int) in.bytesRemaining(); + int length = (int) in.getChannel().size(); int count = in.readInt(); updateChecksumInt(crc, count); @@ -249,7 +250,7 @@ public Map deserialize(Descriptor descriptor, return components; } - private static void maybeValidateChecksum(CRC32 crc, FileDataInput in, Descriptor descriptor) throws IOException + private static void maybeValidateChecksum(CRC32 crc, DataInputPlus in, Descriptor descriptor) throws IOException { if (!descriptor.version.hasMetadataChecksum()) return; diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java index e1a70f39a790..7c0399a7a34b 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java @@ -28,11 +28,14 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.Optional; import java.util.Random; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -67,6 +70,7 @@ import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.ReadExecutionController; import org.apache.cassandra.db.RowUpdateBuilder; @@ -120,6 +124,7 @@ import static org.apache.cassandra.config.CassandraRelevantProperties.BF_RECREATE_ON_FP_CHANCE_CHANGE; import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; import static org.apache.cassandra.db.ColumnFamilyStore.FlushReason.UNIT_TESTS; +import static org.apache.cassandra.io.sstable.SSTable.logger; import static org.apache.cassandra.schema.CompressionParams.DEFAULT_CHUNK_LENGTH; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; @@ -1287,6 +1292,168 @@ public void testGetPositionsForRangesFromTableOpenedForBulkLoading() bulkLoaded.selfRef().release(); } + @Test + public void testGetApproximatePositionsForRangesWithRowIndex() + { + int columnIndexSizeKiB = DatabaseDescriptor.getColumnIndexSizeInKiB(); + // This ensures partitions of "any size" have a row index. Though in practice, the code never consider adding + // a row index entry unless there is at least 2 rows, hence we add 2 per-partition below. + DatabaseDescriptor.setColumnIndexSizeInKiB(0); + try + { + testGetApproximatePositionsForRanges(2); + } + finally + { + DatabaseDescriptor.setColumnIndexSizeInKiB(columnIndexSizeKiB); + } + } + + @Test + public void testGetApproximatePositionsForRanges() + { + testGetApproximatePositionsForRanges(1); + } + + public void testGetApproximatePositionsForRanges(int rowPerPartition) + { + ColumnFamilyStore store = discardSSTables(KEYSPACE1, CF_STANDARD2); + partitioner = store.getPartitioner(); + + String[] keys = new String[]{ + "a", + "an", + "ant", + "ante", + "anti", + "to", + "tea", + "teal", + "team", + "ted", + "ten", + "tend", + "yen" + }; + addPartitionsUnsafe(store, rowPerPartition, keys); + + store.forceBlockingFlush(UNIT_TESTS); + CompactionManager.instance.performMaximal(store, false); + + SSTableReader sstable = store.getLiveSSTables().iterator().next(); + + // To avoid depending on the details of the partitioner, we simply scan the full sstable to collect the + // positions of all keys in the data file (and keep them sorted in sstable order). + NavigableMap keyPositions = keyPositions(sstable); + + // Ensure that getting the positions corresponding to the ranges passed in argument are "valid" (based on the + // actual positions we collected above). This tests both `SSTableReader#getPositionsForRanges` and + // `SSTableReader#getApproximatePositionsForRanges`. + validatePositions(sstable, new Range<>(partitioner.getMinimumToken(), partitioner.getMinimumToken()), keyPositions); + validatePositions(sstable, new Range<>(t("an"), partitioner.getMinimumToken()), keyPositions); + validatePositions(sstable, new Range<>(partitioner.getMinimumToken(), t("te")), keyPositions); + validatePositions(sstable, new Range<>(t("an"), t("b")), keyPositions); + validatePositions(sstable, new Range<>(t("te"), t("tea")), keyPositions); + validatePositions(sstable, new Range<>(t("g"), t("yen")), keyPositions); + validatePositions(sstable, new Range<>(t("ant"), t("t")), keyPositions); + + // The rest of the tests are designed to work regardless of the partitioner, but those below do depend on + // order. Tests do run with ordered partitioner but being defensive. + if (partitioner.preservesOrder()) + { + validateEmptyRange(sstable, new Range<>(t("bar"), t("foo"))); + validateEmptyRange(sstable, new Range<>(t("zoo"), partitioner.getMinimumToken())); + } + } + + private void addPartitionsUnsafe(ColumnFamilyStore store, int rowPerPartition, String... keys) + { + for (String key : keys) + { + for (int i = 0; i < rowPerPartition; i++) + { + new RowUpdateBuilder(store.metadata(), System.currentTimeMillis(), key) + .clustering(String.valueOf(i)) + .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER) + .build() + .applyUnsafe(); + } + } + } + + // Returns a 2-length array with the first element of the iterator, and then the 2nd one. If any of those does not + // exist, it is set to the default value. + private long[] pullFirst2Values(Iterator> iter, long defaultValue) + { + long[] result = new long[]{ defaultValue, defaultValue }; + if (iter.hasNext()) + { + result[0] = iter.next().getValue(); + if (iter.hasNext()) + result[1] = iter.next().getValue(); + } + return result; + } + + private void validateEmptyRange(SSTableReader sstable, Range range) + { + assertEquals(List.of(), sstable.getPositionsForRanges(List.of(range))); + assertEquals(List.of(), sstable.getApproximatePositionsForRanges(List.of(range))); + } + + private void validatePositions(SSTableReader sstable, Range range, NavigableMap keyPositions) + { + // Testing `getPositionsForRanges` is easy: we can grab the position in `keyPosition` of the `ceiling` entry + // for the start, and the `floor` entry for the end, and this is what `getPositionsForRanges` should return. + // But `getApproximatePositionsForRanges` is allowed to be "one key off", as long as the returned offset range + // still fully cover the requested range (so for the `lowerPosition`, we can return the key just _before_ the + // expected one, and for `upperPosition`, we can return key just _after_ the expected one). + // To test both, we create 2-element arrays `start` and `end`. In both cases, the first element is the true + // expected value (what `getPositionsForRange` should return), and the 2nd element is the only other value that + // is still considered correct for `getApproximatePositionsForRange`. + + var startIt = keyPositions.headMap(keyPositions.ceilingKey(range.left.maxKeyBound()), true).descendingMap().entrySet().iterator(); + long[] start = pullFirst2Values(startIt, 0); + + var endIt = range.right.isMinimum() + ? Collections.>emptyIterator() + : keyPositions.tailMap(range.right.maxKeyBound(), true).entrySet().iterator(); + long[] end = pullFirst2Values(endIt, sstable.uncompressedLength()); + + // getPositionsForRanges should give us the exact positions we expect. + var positions = sstable.getPositionsForRanges(List.of(range)); + assertEquals(1, positions.size()); + var pos = positions.get(0); + assertEquals(start[0], pos.lowerPosition); + assertEquals(end[0], pos.upperPosition); + + // getApproximatePositionsForRanges can give us the exact positions, but it is allowed to be "one off" as + // long as it is in the right direction (meaning that the returned ranges covers at least the expected range). + positions = sstable.getApproximatePositionsForRanges(List.of(range)); + assertEquals(1, positions.size()); + pos = positions.get(0); + logger.info("Approx start: {}", pos.lowerPosition); + logger.info("Approx end : {}", pos.upperPosition); + assertTrue(String.format("Computed lower position %d is neither %d nor %d", pos.lowerPosition, start[0], start[1]), pos.lowerPosition == start[0] || pos.lowerPosition == start[1]); + assertTrue(String.format("Computed upper position %d is neither %d nor %d", pos.upperPosition, end[0], end[1]), pos.upperPosition == end[0] || pos.upperPosition == end[1]); + } + + // Sequentially read the sstable to extract the keys, in token order, and their starting position in the data file. + private NavigableMap keyPositions(SSTableReader sstable) + { + NavigableMap map = new TreeMap<>(); + ISSTableScanner scanner = sstable.getScanner(); + while (scanner.hasNext()) + { + // The scanner is positioned at the start of the partition after the `hasNext`. After the `next()`, it will + // have read the partition key and be positioned after it. And the next `hasNext` is what "exhaust" the + // last returned partition, positioning the scanner after it (and so on the next partition). + long pos = scanner.getCurrentPosition(); + map.put(scanner.next().partitionKey(), pos); + } + return map; + } + @Test public void testIndexSummaryReplacement() throws IOException, ExecutionException, InterruptedException { diff --git a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java index 2d92bc5dee81..3884daf20af0 100644 --- a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java @@ -29,7 +29,6 @@ import java.util.Map; import com.google.common.primitives.Bytes; - import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; @@ -53,9 +52,9 @@ import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.io.util.File; +import org.apache.cassandra.io.util.FileInputStreamPlus; import org.apache.cassandra.io.util.FileOutputStreamPlus; import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.Throwables; @@ -103,7 +102,7 @@ public void testSerialization() throws IOException Descriptor desc = new Descriptor(latestVersion, FileUtils.getTempDir(), "test", "test", new SequenceBasedSSTableId(0)); File statsFile = serialize(originalMetadata, serializer, desc); - try (RandomAccessReader in = RandomAccessReader.open(statsFile)) + try (FileInputStreamPlus in = new FileInputStreamPlus(statsFile)) { Map deserialized = serializer.deserialize(desc, in, EnumSet.allOf(MetadataType.class)); @@ -133,7 +132,7 @@ public void testSerializationWithEncryption() throws IOException Descriptor desc = new Descriptor(format.getLatestVersion(), FileUtils.getTempDir(), "test", "test", new SequenceBasedSSTableId(0)); File statsFile = serializeWithEncryption(originalMetadata, serializer, desc); - try (RandomAccessReader in = RandomAccessReader.open(statsFile)) + try (FileInputStreamPlus in = new FileInputStreamPlus(statsFile)) { Map deserialized = deserializeWithEncryption(serializer, desc, in); @@ -143,7 +142,7 @@ public void testSerializationWithEncryption() throws IOException } } } - + @Test public void testEncryption() throws IOException { @@ -186,7 +185,7 @@ public void testHistogramSterilization() throws IOException Descriptor desc = new Descriptor(format.getLatestVersion(), FileUtils.getTempDir(), "test", "test", new SequenceBasedSSTableId(0)); File statsFile = serialize(originalMetadata, serializer, desc); - try (RandomAccessReader in = RandomAccessReader.open(statsFile)) + try (FileInputStreamPlus in = new FileInputStreamPlus(statsFile)) { // Deserialize and verify that the two histograms have had their overflow buckets cleared: Map deserialized = serializer.deserialize(desc, in, EnumSet.allOf(MetadataType.class)); @@ -234,7 +233,7 @@ public File serializeWithEncryption(Map metadat } } - private Map deserializeWithEncryption(MetadataSerializer serializer, Descriptor desc, RandomAccessReader in) throws IOException + private Map deserializeWithEncryption(MetadataSerializer serializer, Descriptor desc, FileInputStreamPlus in) throws IOException { MetadataSerializer.testCompressionParams = compressionParams; try @@ -327,8 +326,8 @@ public void testOldReadsNew(String oldV, String newV) throws IOException File statsFileLa = serialize(originalMetadata, serializer, descOld); // Reading both as earlier version should yield identical results. Descriptor desc = new Descriptor(format.getVersion(oldV), statsFileLb.parent(), "", "", new SequenceBasedSSTableId(0)); - try (RandomAccessReader inLb = RandomAccessReader.open(statsFileLb); - RandomAccessReader inLa = RandomAccessReader.open(statsFileLa)) + try (FileInputStreamPlus inLb = new FileInputStreamPlus(statsFileLb); + FileInputStreamPlus inLa = new FileInputStreamPlus(statsFileLa)) { Map deserializedLb = serializer.deserialize(desc, inLb, EnumSet.allOf(MetadataType.class)); Map deserializedLa = serializer.deserialize(desc, inLa, EnumSet.allOf(MetadataType.class));