Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,45 @@ public List<PartitionPositionBounds> getPositionsForRanges(Collection<Range<Toke
return positions;
}

/**
* Similar to {@link #getPositionsForRanges(Collection)}, but this method is allowed to return positions that
* cover "more" than strictly the provided ranges.
* <p>
* By allowing some imprecision, this method may be faster/avoid reads to the data file that are otherwise
* necessary. In practice, the positions returned by this method may be up to "one key off" for each bound of
* each range. In other words, say we pass range `(t_s, t_e]`, and let's denote by `k_i` the ith key in the
* underlying sstable. And suppose that `getPositionsForRanges([(t_s, t_e]])` returns `(k_s, k_e)` (with `s` < `e`),
* where `k_i` means the position to i-th key in the sstable. Then this method applied to this same range may return
* either one of `(k_s, k_e)` (same result), `(k_s-1, k_e)`, `(k_s, k_e+1)`, or `(k_s-1, k_e+1)`
* <p>
* Also note that as a consequence of this, the returned list of position bounds may have some strict overlap
* (the method could return something along the lines of `[(0, 100), (80, 200)]`). But all the starting positions
* and all the ending positions will still be ordered.
*/
public List<PartitionPositionBounds> getApproximatePositionsForRanges(Collection<Range<Token>> ranges)
{
List<PartitionPositionBounds> positions = new ArrayList<>();
for (Range<Token> range : Range.normalize(ranges))
{
assert !range.isWrapAround() || range.right.isMinimum();
AbstractBounds<PartitionPosition> bounds = Range.makeRowRange(range);
PartitionPositionBounds pb = getApproximatePositionsForBounds(bounds);
if (pb != null)
positions.add(pb);
}
return positions;
}

/**
* This is to {@link #getPositionsForBounds(AbstractBounds)} what {@link #getApproximatePositionsForRanges(Collection)}
* is to {@link #getPositionsForRanges(Collection)}.
*/
public PartitionPositionBounds getApproximatePositionsForBounds(AbstractBounds<PartitionPosition> bounds)
{
// Return the exact positions by default; this can be overridden by concrete sstable implementations.
return getPositionsForBounds(bounds);
}

/**
* Get a list of data positions in this SSTable that correspond to the given list of bounds. This method will remove
* non-covered intervals, but will not correct order or overlap in the supplied list, e.g. if bounds overlap, the
Expand Down Expand Up @@ -2032,6 +2071,12 @@ public final boolean equals(Object o)
PartitionPositionBounds that = (PartitionPositionBounds) o;
return lowerPosition == that.lowerPosition && upperPosition == that.upperPosition;
}

@Override
public String toString()
{
return String.format("(%d, %d)", lowerPosition, upperPosition);
}
}

public static class IndexesBounds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,77 @@ protected TrieIndexEntry getRowIndexEntry(PartitionPosition key,
throw new IllegalArgumentException("Invalid op: " + operator);
}

private TrieIndexEntry getApproximatePosition(PartitionPosition key, Operator op, boolean isLeftBound)
{
assert op == GT || op == GE;
// We currently only need this method in contexts where neither early opening nor zero copy transfer are used,
// which means we don't have to worry about `filterFirst`/`filterLast`. We could expand that method to support
// those, but it's unclear it will ever be needed, and this would require proper testing, so leaving aside for now.
assert openReason != OpenReason.MOVED_START : "Early opening is not supported with this method";
assert !sstableMetadata.zeroCopyMetadata.exists() : "SSTables with zero copy metadata are not supported";

try (PartitionIndex.Reader reader = partitionIndex.openReader())
{
return reader.ceiling(key, (pos, assumeNoMatch, compareKey) -> {
// The goal of the overall method, compared to `getPosition`, is to avoid reading the data file. If
// whatever partition we look at has a row index (`pos >= 0`), then `retrieveEntryIfAcceptable` may
// read the row index file, but it will never read the data file, so we can use it like in `getPosition`.
if (pos >= 0)
return retrieveEntryIfAcceptable(op, compareKey, pos, assumeNoMatch);

// If `assumeNoMatch == false`, then it means we've matched a prefix of the searched key. This means
// `pos` points to a key `K` in the sstable that is "the closest on" to `searchKey`, but it may be
// before, equal or after `searchKey`. In `getPosition`, `retrieveEntryIfAcceptable` reads the
// actual key in data file to decide which case we're on, and base on that whether we want that entry,
// or the one after that (in order). But here, we explicitly want to avoid that read to the data file,
// so:
// - if it is a left bound, we eagerly accept a prefix entry. If `K > searchKey`, then that's the
// "best" answer anyway. If `K < searchKey`, then returning the "next" key would have been "best",
// but returning `K` is only "one off" and still covers `searchKey`, so it is acceptable.
// - if it is a right bound, then we do not accept a prefix. Whatever `K` is, we will only accept the
// "next" key (`assumeNoMatch` will then be `true`). If it happened that `K < searchKey`, then
// we were right to not return `K` and the "next" key is the best choice. If `K > searchKey`,
// then we're again "one off" compared to the best option, but as we cover `searchKey`, it is
// acceptable.
// We didn't mention `K = searchKey` above because whether it falls in the camp of `>` or `<` in the
// cases above depend on whether `searchOp` is GT or GE, but the overall resonable extend there
// otherwise.
return isLeftBound || assumeNoMatch ? new TrieIndexEntry(~pos) : null;
});
}
catch (IOException e)
{
markSuspect();
throw new CorruptSSTableException(e, rowIndexFile.path());
}
}

@Override
public PartitionPositionBounds getApproximatePositionsForBounds(AbstractBounds<PartitionPosition> bounds)
{
TrieIndexEntry rieLeft = getApproximatePosition(bounds.left, bounds.inclusiveLeft() ? Operator.GE : Operator.GT, true);
if (rieLeft == null) // empty range
return null;
long left = rieLeft.position;

TrieIndexEntry rieRight = bounds.right.isMinimum()
? null
: getApproximatePosition(bounds.right, bounds.inclusiveRight() ? Operator.GT : Operator.GE, false);
long right;
if (rieRight != null)
right = rieRight.position;
else // right is beyond end
right = uncompressedLength(); // this should also be correct for EARLY readers

if (left >= right)
{
// empty range
return null;
}

return new PartitionPositionBounds(left, right);
}

/**
* Called by {@link #getRowIndexEntry} above (via Reader.ceiling/floor) to check if the position satisfies the full
* key constraint. This is called once if there is a prefix match (which can be in any relationship with the sought
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.zip.CRC32;

import com.google.common.base.Throwables;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,13 +41,13 @@
import org.apache.cassandra.io.sstable.format.SSTableFormat.Components;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.io.util.FileInputStreamPlus;

import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;

Expand Down Expand Up @@ -156,7 +157,7 @@ public Map<MetadataType, MetadataComponent> deserialize(Descriptor descriptor, E
}
else
{
try (RandomAccessReader r = RandomAccessReader.open(statsFile))
try (FileInputStreamPlus r = new FileInputStreamPlus(statsFile))
{
components = deserialize(descriptor, r, types);
}
Expand All @@ -170,7 +171,7 @@ public MetadataComponent deserialize(Descriptor descriptor, MetadataType type) t
}

public Map<MetadataType, MetadataComponent> deserialize(Descriptor descriptor,
FileDataInput in,
FileInputStreamPlus in,
EnumSet<MetadataType> selectedTypes)
throws IOException
{
Expand All @@ -181,7 +182,7 @@ public Map<MetadataType, MetadataComponent> deserialize(Descriptor descriptor,
* Read TOC
*/

int length = (int) in.bytesRemaining();
int length = (int) in.getChannel().size();

int count = in.readInt();
updateChecksumInt(crc, count);
Expand Down Expand Up @@ -249,7 +250,7 @@ public Map<MetadataType, MetadataComponent> deserialize(Descriptor descriptor,
return components;
}

private static void maybeValidateChecksum(CRC32 crc, FileDataInput in, Descriptor descriptor) throws IOException
private static void maybeValidateChecksum(CRC32 crc, DataInputPlus in, Descriptor descriptor) throws IOException
{
if (!descriptor.version.hasMetadataChecksum())
return;
Expand Down
167 changes: 167 additions & 0 deletions test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand Down Expand Up @@ -67,6 +70,7 @@
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadExecutionController;
import org.apache.cassandra.db.RowUpdateBuilder;
Expand Down Expand Up @@ -120,6 +124,7 @@
import static org.apache.cassandra.config.CassandraRelevantProperties.BF_RECREATE_ON_FP_CHANCE_CHANGE;
import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
import static org.apache.cassandra.db.ColumnFamilyStore.FlushReason.UNIT_TESTS;
import static org.apache.cassandra.io.sstable.SSTable.logger;
import static org.apache.cassandra.schema.CompressionParams.DEFAULT_CHUNK_LENGTH;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -1287,6 +1292,168 @@ public void testGetPositionsForRangesFromTableOpenedForBulkLoading()
bulkLoaded.selfRef().release();
}

@Test
public void testGetApproximatePositionsForRangesWithRowIndex()
{
int columnIndexSizeKiB = DatabaseDescriptor.getColumnIndexSizeInKiB();
// This ensures partitions of "any size" have a row index. Though in practice, the code never consider adding
// a row index entry unless there is at least 2 rows, hence we add 2 per-partition below.
DatabaseDescriptor.setColumnIndexSizeInKiB(0);
try
{
testGetApproximatePositionsForRanges(2);
}
finally
{
DatabaseDescriptor.setColumnIndexSizeInKiB(columnIndexSizeKiB);
}
}

@Test
public void testGetApproximatePositionsForRanges()
{
testGetApproximatePositionsForRanges(1);
}

public void testGetApproximatePositionsForRanges(int rowPerPartition)
{
ColumnFamilyStore store = discardSSTables(KEYSPACE1, CF_STANDARD2);
partitioner = store.getPartitioner();

String[] keys = new String[]{
"a",
"an",
"ant",
"ante",
"anti",
"to",
"tea",
"teal",
"team",
"ted",
"ten",
"tend",
"yen"
};
addPartitionsUnsafe(store, rowPerPartition, keys);

store.forceBlockingFlush(UNIT_TESTS);
CompactionManager.instance.performMaximal(store, false);

SSTableReader sstable = store.getLiveSSTables().iterator().next();

// To avoid depending on the details of the partitioner, we simply scan the full sstable to collect the
// positions of all keys in the data file (and keep them sorted in sstable order).
NavigableMap<PartitionPosition, Long> keyPositions = keyPositions(sstable);

// Ensure that getting the positions corresponding to the ranges passed in argument are "valid" (based on the
// actual positions we collected above). This tests both `SSTableReader#getPositionsForRanges` and
// `SSTableReader#getApproximatePositionsForRanges`.
validatePositions(sstable, new Range<>(partitioner.getMinimumToken(), partitioner.getMinimumToken()), keyPositions);
validatePositions(sstable, new Range<>(t("an"), partitioner.getMinimumToken()), keyPositions);
validatePositions(sstable, new Range<>(partitioner.getMinimumToken(), t("te")), keyPositions);
validatePositions(sstable, new Range<>(t("an"), t("b")), keyPositions);
validatePositions(sstable, new Range<>(t("te"), t("tea")), keyPositions);
validatePositions(sstable, new Range<>(t("g"), t("yen")), keyPositions);
validatePositions(sstable, new Range<>(t("ant"), t("t")), keyPositions);

// The rest of the tests are designed to work regardless of the partitioner, but those below do depend on
// order. Tests do run with ordered partitioner but being defensive.
if (partitioner.preservesOrder())
{
validateEmptyRange(sstable, new Range<>(t("bar"), t("foo")));
validateEmptyRange(sstable, new Range<>(t("zoo"), partitioner.getMinimumToken()));
}
}

private void addPartitionsUnsafe(ColumnFamilyStore store, int rowPerPartition, String... keys)
{
for (String key : keys)
{
for (int i = 0; i < rowPerPartition; i++)
{
new RowUpdateBuilder(store.metadata(), System.currentTimeMillis(), key)
.clustering(String.valueOf(i))
.add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
.build()
.applyUnsafe();
}
}
}

// Returns a 2-length array with the first element of the iterator, and then the 2nd one. If any of those does not
// exist, it is set to the default value.
private long[] pullFirst2Values(Iterator<Map.Entry<PartitionPosition, Long>> iter, long defaultValue)
{
long[] result = new long[]{ defaultValue, defaultValue };
if (iter.hasNext())
{
result[0] = iter.next().getValue();
if (iter.hasNext())
result[1] = iter.next().getValue();
}
return result;
}

private void validateEmptyRange(SSTableReader sstable, Range<Token> range)
{
assertEquals(List.of(), sstable.getPositionsForRanges(List.of(range)));
assertEquals(List.of(), sstable.getApproximatePositionsForRanges(List.of(range)));
}

private void validatePositions(SSTableReader sstable, Range<Token> range, NavigableMap<PartitionPosition, Long> keyPositions)
{
// Testing `getPositionsForRanges` is easy: we can grab the position in `keyPosition` of the `ceiling` entry
// for the start, and the `floor` entry for the end, and this is what `getPositionsForRanges` should return.
// But `getApproximatePositionsForRanges` is allowed to be "one key off", as long as the returned offset range
// still fully cover the requested range (so for the `lowerPosition`, we can return the key just _before_ the
// expected one, and for `upperPosition`, we can return key just _after_ the expected one).
// To test both, we create 2-element arrays `start` and `end`. In both cases, the first element is the true
// expected value (what `getPositionsForRange` should return), and the 2nd element is the only other value that
// is still considered correct for `getApproximatePositionsForRange`.

var startIt = keyPositions.headMap(keyPositions.ceilingKey(range.left.maxKeyBound()), true).descendingMap().entrySet().iterator();
long[] start = pullFirst2Values(startIt, 0);

var endIt = range.right.isMinimum()
? Collections.<Map.Entry<PartitionPosition, Long>>emptyIterator()
: keyPositions.tailMap(range.right.maxKeyBound(), true).entrySet().iterator();
long[] end = pullFirst2Values(endIt, sstable.uncompressedLength());

// getPositionsForRanges should give us the exact positions we expect.
var positions = sstable.getPositionsForRanges(List.of(range));
assertEquals(1, positions.size());
var pos = positions.get(0);
assertEquals(start[0], pos.lowerPosition);
assertEquals(end[0], pos.upperPosition);

// getApproximatePositionsForRanges can give us the exact positions, but it is allowed to be "one off" as
// long as it is in the right direction (meaning that the returned ranges covers at least the expected range).
positions = sstable.getApproximatePositionsForRanges(List.of(range));
assertEquals(1, positions.size());
pos = positions.get(0);
logger.info("Approx start: {}", pos.lowerPosition);
logger.info("Approx end : {}", pos.upperPosition);
assertTrue(String.format("Computed lower position %d is neither %d nor %d", pos.lowerPosition, start[0], start[1]), pos.lowerPosition == start[0] || pos.lowerPosition == start[1]);
assertTrue(String.format("Computed upper position %d is neither %d nor %d", pos.upperPosition, end[0], end[1]), pos.upperPosition == end[0] || pos.upperPosition == end[1]);
}

// Sequentially read the sstable to extract the keys, in token order, and their starting position in the data file.
private NavigableMap<PartitionPosition, Long> keyPositions(SSTableReader sstable)
{
NavigableMap<PartitionPosition, Long> map = new TreeMap<>();
ISSTableScanner scanner = sstable.getScanner();
while (scanner.hasNext())
{
// The scanner is positioned at the start of the partition after the `hasNext`. After the `next()`, it will
// have read the partition key and be positioned after it. And the next `hasNext` is what "exhaust" the
// last returned partition, positioning the scanner after it (and so on the next partition).
long pos = scanner.getCurrentPosition();
map.put(scanner.next().partitionKey(), pos);
}
return map;
}

@Test
public void testIndexSummaryReplacement() throws IOException, ExecutionException, InterruptedException
{
Expand Down
Loading