diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index f240d9a70a14..65d4a0a1df06 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -846,6 +846,45 @@ public List getPositionsForRanges(Collection + * By allowing some imprecision, this method may be faster/avoid reads to the data file that are otherwise + * necessary. In practice, the positions returned by this method may be up to "one key off" for each bound of + * each range. In other words, say we pass range `(t_s, t_e]`, and let's denote by `k_i` the ith key in the + * underlying sstable. And suppose that `getPositionsForRanges([(t_s, t_e]])` returns `(k_s, k_e)` (with `s` < `e`), + * where `k_i` means the position to i-th key in the sstable. Then this method applied to this same range may return + * either one of `(k_s, k_e)` (same result), `(k_s-1, k_e)`, `(k_s, k_e+1)`, or `(k_s-1, k_e+1)` + *

+ * Also note that as a consequence of this, the returned list of position bounds may have some strict overlap + * (the method could return something along the lines of `[(0, 100), (80, 200)]`). But all the starting positions + * and all the ending positions will still be ordered. + */ + public List getApproximatePositionsForRanges(Collection> ranges) + { + List positions = new ArrayList<>(); + for (Range range : Range.normalize(ranges)) + { + assert !range.isWrapAround() || range.right.isMinimum(); + AbstractBounds bounds = Range.makeRowRange(range); + PartitionPositionBounds pb = getApproximatePositionsForBounds(bounds); + if (pb != null) + positions.add(pb); + } + return positions; + } + + /** + * This is to {@link #getPositionsForBounds(AbstractBounds)} what {@link #getApproximatePositionsForRanges(Collection)} + * is to {@link #getPositionsForRanges(Collection)}. + */ + public PartitionPositionBounds getApproximatePositionsForBounds(AbstractBounds bounds) + { + // Return the exact positions by default; this can be overridden by concrete sstable implementations. + return getPositionsForBounds(bounds); + } + /** * Get a list of data positions in this SSTable that correspond to the given list of bounds. This method will remove * non-covered intervals, but will not correct order or overlap in the supplied list, e.g. if bounds overlap, the @@ -2032,6 +2071,12 @@ public final boolean equals(Object o) PartitionPositionBounds that = (PartitionPositionBounds) o; return lowerPosition == that.lowerPosition && upperPosition == that.upperPosition; } + + @Override + public String toString() + { + return String.format("(%d, %d)", lowerPosition, upperPosition); + } } public static class IndexesBounds diff --git a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReader.java index dd439c13bb3b..86df9fa6375c 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReader.java @@ -169,6 +169,77 @@ protected TrieIndexEntry getRowIndexEntry(PartitionPosition key, throw new IllegalArgumentException("Invalid op: " + operator); } + private TrieIndexEntry getApproximatePosition(PartitionPosition key, Operator op, boolean isLeftBound) + { + assert op == GT || op == GE; + // We currently only need this method in contexts where neither early opening nor zero copy transfer are used, + // which means we don't have to worry about `filterFirst`/`filterLast`. We could expand that method to support + // those, but it's unclear it will ever be needed, and this would require proper testing, so leaving aside for now. + assert openReason != OpenReason.MOVED_START : "Early opening is not supported with this method"; + assert !sstableMetadata.zeroCopyMetadata.exists() : "SSTables with zero copy metadata are not supported"; + + try (PartitionIndex.Reader reader = partitionIndex.openReader()) + { + return reader.ceiling(key, (pos, assumeNoMatch, compareKey) -> { + // The goal of the overall method, compared to `getPosition`, is to avoid reading the data file. If + // whatever partition we look at has a row index (`pos >= 0`), then `retrieveEntryIfAcceptable` may + // read the row index file, but it will never read the data file, so we can use it like in `getPosition`. + if (pos >= 0) + return retrieveEntryIfAcceptable(op, compareKey, pos, assumeNoMatch); + + // If `assumeNoMatch == false`, then it means we've matched a prefix of the searched key. This means + // `pos` points to a key `K` in the sstable that is "the closest on" to `searchKey`, but it may be + // before, equal or after `searchKey`. In `getPosition`, `retrieveEntryIfAcceptable` reads the + // actual key in data file to decide which case we're on, and base on that whether we want that entry, + // or the one after that (in order). But here, we explicitly want to avoid that read to the data file, + // so: + // - if it is a left bound, we eagerly accept a prefix entry. If `K > searchKey`, then that's the + // "best" answer anyway. If `K < searchKey`, then returning the "next" key would have been "best", + // but returning `K` is only "one off" and still covers `searchKey`, so it is acceptable. + // - if it is a right bound, then we do not accept a prefix. Whatever `K` is, we will only accept the + // "next" key (`assumeNoMatch` will then be `true`). If it happened that `K < searchKey`, then + // we were right to not return `K` and the "next" key is the best choice. If `K > searchKey`, + // then we're again "one off" compared to the best option, but as we cover `searchKey`, it is + // acceptable. + // We didn't mention `K = searchKey` above because whether it falls in the camp of `>` or `<` in the + // cases above depend on whether `searchOp` is GT or GE, but the overall resonable extend there + // otherwise. + return isLeftBound || assumeNoMatch ? new TrieIndexEntry(~pos) : null; + }); + } + catch (IOException e) + { + markSuspect(); + throw new CorruptSSTableException(e, rowIndexFile.path()); + } + } + + @Override + public PartitionPositionBounds getApproximatePositionsForBounds(AbstractBounds bounds) + { + TrieIndexEntry rieLeft = getApproximatePosition(bounds.left, bounds.inclusiveLeft() ? Operator.GE : Operator.GT, true); + if (rieLeft == null) // empty range + return null; + long left = rieLeft.position; + + TrieIndexEntry rieRight = bounds.right.isMinimum() + ? null + : getApproximatePosition(bounds.right, bounds.inclusiveRight() ? Operator.GT : Operator.GE, false); + long right; + if (rieRight != null) + right = rieRight.position; + else // right is beyond end + right = uncompressedLength(); // this should also be correct for EARLY readers + + if (left >= right) + { + // empty range + return null; + } + + return new PartitionPositionBounds(left, right); + } + /** * Called by {@link #getRowIndexEntry} above (via Reader.ceiling/floor) to check if the position satisfies the full * key constraint. This is called once if there is a prefix match (which can be in any relationship with the sought diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java index 3b7162a58877..e01055c8d23c 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java @@ -28,6 +28,7 @@ import java.util.zip.CRC32; import com.google.common.base.Throwables; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,13 +41,13 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat.Components; import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.io.util.File; -import org.apache.cassandra.io.util.FileDataInput; -import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.utils.TimeUUID; +import org.apache.cassandra.io.util.FileInputStreamPlus; import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt; @@ -156,7 +157,7 @@ public Map deserialize(Descriptor descriptor, E } else { - try (RandomAccessReader r = RandomAccessReader.open(statsFile)) + try (FileInputStreamPlus r = new FileInputStreamPlus(statsFile)) { components = deserialize(descriptor, r, types); } @@ -170,7 +171,7 @@ public MetadataComponent deserialize(Descriptor descriptor, MetadataType type) t } public Map deserialize(Descriptor descriptor, - FileDataInput in, + FileInputStreamPlus in, EnumSet selectedTypes) throws IOException { @@ -181,7 +182,7 @@ public Map deserialize(Descriptor descriptor, * Read TOC */ - int length = (int) in.bytesRemaining(); + int length = (int) in.getChannel().size(); int count = in.readInt(); updateChecksumInt(crc, count); @@ -249,7 +250,7 @@ public Map deserialize(Descriptor descriptor, return components; } - private static void maybeValidateChecksum(CRC32 crc, FileDataInput in, Descriptor descriptor) throws IOException + private static void maybeValidateChecksum(CRC32 crc, DataInputPlus in, Descriptor descriptor) throws IOException { if (!descriptor.version.hasMetadataChecksum()) return; diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java index e1a70f39a790..7c0399a7a34b 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java @@ -28,11 +28,14 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.Optional; import java.util.Random; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -67,6 +70,7 @@ import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.ReadExecutionController; import org.apache.cassandra.db.RowUpdateBuilder; @@ -120,6 +124,7 @@ import static org.apache.cassandra.config.CassandraRelevantProperties.BF_RECREATE_ON_FP_CHANCE_CHANGE; import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; import static org.apache.cassandra.db.ColumnFamilyStore.FlushReason.UNIT_TESTS; +import static org.apache.cassandra.io.sstable.SSTable.logger; import static org.apache.cassandra.schema.CompressionParams.DEFAULT_CHUNK_LENGTH; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; @@ -1287,6 +1292,168 @@ public void testGetPositionsForRangesFromTableOpenedForBulkLoading() bulkLoaded.selfRef().release(); } + @Test + public void testGetApproximatePositionsForRangesWithRowIndex() + { + int columnIndexSizeKiB = DatabaseDescriptor.getColumnIndexSizeInKiB(); + // This ensures partitions of "any size" have a row index. Though in practice, the code never consider adding + // a row index entry unless there is at least 2 rows, hence we add 2 per-partition below. + DatabaseDescriptor.setColumnIndexSizeInKiB(0); + try + { + testGetApproximatePositionsForRanges(2); + } + finally + { + DatabaseDescriptor.setColumnIndexSizeInKiB(columnIndexSizeKiB); + } + } + + @Test + public void testGetApproximatePositionsForRanges() + { + testGetApproximatePositionsForRanges(1); + } + + public void testGetApproximatePositionsForRanges(int rowPerPartition) + { + ColumnFamilyStore store = discardSSTables(KEYSPACE1, CF_STANDARD2); + partitioner = store.getPartitioner(); + + String[] keys = new String[]{ + "a", + "an", + "ant", + "ante", + "anti", + "to", + "tea", + "teal", + "team", + "ted", + "ten", + "tend", + "yen" + }; + addPartitionsUnsafe(store, rowPerPartition, keys); + + store.forceBlockingFlush(UNIT_TESTS); + CompactionManager.instance.performMaximal(store, false); + + SSTableReader sstable = store.getLiveSSTables().iterator().next(); + + // To avoid depending on the details of the partitioner, we simply scan the full sstable to collect the + // positions of all keys in the data file (and keep them sorted in sstable order). + NavigableMap keyPositions = keyPositions(sstable); + + // Ensure that getting the positions corresponding to the ranges passed in argument are "valid" (based on the + // actual positions we collected above). This tests both `SSTableReader#getPositionsForRanges` and + // `SSTableReader#getApproximatePositionsForRanges`. + validatePositions(sstable, new Range<>(partitioner.getMinimumToken(), partitioner.getMinimumToken()), keyPositions); + validatePositions(sstable, new Range<>(t("an"), partitioner.getMinimumToken()), keyPositions); + validatePositions(sstable, new Range<>(partitioner.getMinimumToken(), t("te")), keyPositions); + validatePositions(sstable, new Range<>(t("an"), t("b")), keyPositions); + validatePositions(sstable, new Range<>(t("te"), t("tea")), keyPositions); + validatePositions(sstable, new Range<>(t("g"), t("yen")), keyPositions); + validatePositions(sstable, new Range<>(t("ant"), t("t")), keyPositions); + + // The rest of the tests are designed to work regardless of the partitioner, but those below do depend on + // order. Tests do run with ordered partitioner but being defensive. + if (partitioner.preservesOrder()) + { + validateEmptyRange(sstable, new Range<>(t("bar"), t("foo"))); + validateEmptyRange(sstable, new Range<>(t("zoo"), partitioner.getMinimumToken())); + } + } + + private void addPartitionsUnsafe(ColumnFamilyStore store, int rowPerPartition, String... keys) + { + for (String key : keys) + { + for (int i = 0; i < rowPerPartition; i++) + { + new RowUpdateBuilder(store.metadata(), System.currentTimeMillis(), key) + .clustering(String.valueOf(i)) + .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER) + .build() + .applyUnsafe(); + } + } + } + + // Returns a 2-length array with the first element of the iterator, and then the 2nd one. If any of those does not + // exist, it is set to the default value. + private long[] pullFirst2Values(Iterator> iter, long defaultValue) + { + long[] result = new long[]{ defaultValue, defaultValue }; + if (iter.hasNext()) + { + result[0] = iter.next().getValue(); + if (iter.hasNext()) + result[1] = iter.next().getValue(); + } + return result; + } + + private void validateEmptyRange(SSTableReader sstable, Range range) + { + assertEquals(List.of(), sstable.getPositionsForRanges(List.of(range))); + assertEquals(List.of(), sstable.getApproximatePositionsForRanges(List.of(range))); + } + + private void validatePositions(SSTableReader sstable, Range range, NavigableMap keyPositions) + { + // Testing `getPositionsForRanges` is easy: we can grab the position in `keyPosition` of the `ceiling` entry + // for the start, and the `floor` entry for the end, and this is what `getPositionsForRanges` should return. + // But `getApproximatePositionsForRanges` is allowed to be "one key off", as long as the returned offset range + // still fully cover the requested range (so for the `lowerPosition`, we can return the key just _before_ the + // expected one, and for `upperPosition`, we can return key just _after_ the expected one). + // To test both, we create 2-element arrays `start` and `end`. In both cases, the first element is the true + // expected value (what `getPositionsForRange` should return), and the 2nd element is the only other value that + // is still considered correct for `getApproximatePositionsForRange`. + + var startIt = keyPositions.headMap(keyPositions.ceilingKey(range.left.maxKeyBound()), true).descendingMap().entrySet().iterator(); + long[] start = pullFirst2Values(startIt, 0); + + var endIt = range.right.isMinimum() + ? Collections.>emptyIterator() + : keyPositions.tailMap(range.right.maxKeyBound(), true).entrySet().iterator(); + long[] end = pullFirst2Values(endIt, sstable.uncompressedLength()); + + // getPositionsForRanges should give us the exact positions we expect. + var positions = sstable.getPositionsForRanges(List.of(range)); + assertEquals(1, positions.size()); + var pos = positions.get(0); + assertEquals(start[0], pos.lowerPosition); + assertEquals(end[0], pos.upperPosition); + + // getApproximatePositionsForRanges can give us the exact positions, but it is allowed to be "one off" as + // long as it is in the right direction (meaning that the returned ranges covers at least the expected range). + positions = sstable.getApproximatePositionsForRanges(List.of(range)); + assertEquals(1, positions.size()); + pos = positions.get(0); + logger.info("Approx start: {}", pos.lowerPosition); + logger.info("Approx end : {}", pos.upperPosition); + assertTrue(String.format("Computed lower position %d is neither %d nor %d", pos.lowerPosition, start[0], start[1]), pos.lowerPosition == start[0] || pos.lowerPosition == start[1]); + assertTrue(String.format("Computed upper position %d is neither %d nor %d", pos.upperPosition, end[0], end[1]), pos.upperPosition == end[0] || pos.upperPosition == end[1]); + } + + // Sequentially read the sstable to extract the keys, in token order, and their starting position in the data file. + private NavigableMap keyPositions(SSTableReader sstable) + { + NavigableMap map = new TreeMap<>(); + ISSTableScanner scanner = sstable.getScanner(); + while (scanner.hasNext()) + { + // The scanner is positioned at the start of the partition after the `hasNext`. After the `next()`, it will + // have read the partition key and be positioned after it. And the next `hasNext` is what "exhaust" the + // last returned partition, positioning the scanner after it (and so on the next partition). + long pos = scanner.getCurrentPosition(); + map.put(scanner.next().partitionKey(), pos); + } + return map; + } + @Test public void testIndexSummaryReplacement() throws IOException, ExecutionException, InterruptedException { diff --git a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java index 2d92bc5dee81..3884daf20af0 100644 --- a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java @@ -29,7 +29,6 @@ import java.util.Map; import com.google.common.primitives.Bytes; - import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; @@ -53,9 +52,9 @@ import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.io.util.File; +import org.apache.cassandra.io.util.FileInputStreamPlus; import org.apache.cassandra.io.util.FileOutputStreamPlus; import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.Throwables; @@ -103,7 +102,7 @@ public void testSerialization() throws IOException Descriptor desc = new Descriptor(latestVersion, FileUtils.getTempDir(), "test", "test", new SequenceBasedSSTableId(0)); File statsFile = serialize(originalMetadata, serializer, desc); - try (RandomAccessReader in = RandomAccessReader.open(statsFile)) + try (FileInputStreamPlus in = new FileInputStreamPlus(statsFile)) { Map deserialized = serializer.deserialize(desc, in, EnumSet.allOf(MetadataType.class)); @@ -133,7 +132,7 @@ public void testSerializationWithEncryption() throws IOException Descriptor desc = new Descriptor(format.getLatestVersion(), FileUtils.getTempDir(), "test", "test", new SequenceBasedSSTableId(0)); File statsFile = serializeWithEncryption(originalMetadata, serializer, desc); - try (RandomAccessReader in = RandomAccessReader.open(statsFile)) + try (FileInputStreamPlus in = new FileInputStreamPlus(statsFile)) { Map deserialized = deserializeWithEncryption(serializer, desc, in); @@ -143,7 +142,7 @@ public void testSerializationWithEncryption() throws IOException } } } - + @Test public void testEncryption() throws IOException { @@ -186,7 +185,7 @@ public void testHistogramSterilization() throws IOException Descriptor desc = new Descriptor(format.getLatestVersion(), FileUtils.getTempDir(), "test", "test", new SequenceBasedSSTableId(0)); File statsFile = serialize(originalMetadata, serializer, desc); - try (RandomAccessReader in = RandomAccessReader.open(statsFile)) + try (FileInputStreamPlus in = new FileInputStreamPlus(statsFile)) { // Deserialize and verify that the two histograms have had their overflow buckets cleared: Map deserialized = serializer.deserialize(desc, in, EnumSet.allOf(MetadataType.class)); @@ -234,7 +233,7 @@ public File serializeWithEncryption(Map metadat } } - private Map deserializeWithEncryption(MetadataSerializer serializer, Descriptor desc, RandomAccessReader in) throws IOException + private Map deserializeWithEncryption(MetadataSerializer serializer, Descriptor desc, FileInputStreamPlus in) throws IOException { MetadataSerializer.testCompressionParams = compressionParams; try @@ -327,8 +326,8 @@ public void testOldReadsNew(String oldV, String newV) throws IOException File statsFileLa = serialize(originalMetadata, serializer, descOld); // Reading both as earlier version should yield identical results. Descriptor desc = new Descriptor(format.getVersion(oldV), statsFileLb.parent(), "", "", new SequenceBasedSSTableId(0)); - try (RandomAccessReader inLb = RandomAccessReader.open(statsFileLb); - RandomAccessReader inLa = RandomAccessReader.open(statsFileLa)) + try (FileInputStreamPlus inLb = new FileInputStreamPlus(statsFileLb); + FileInputStreamPlus inLa = new FileInputStreamPlus(statsFileLa)) { Map deserializedLb = serializer.deserialize(desc, inLb, EnumSet.allOf(MetadataType.class)); Map deserializedLa = serializer.deserialize(desc, inLa, EnumSet.allOf(MetadataType.class));