From 8a634e22ae5353e906eb334d516db8e68f69033e Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Thu, 2 Oct 2025 09:26:58 -0500 Subject: [PATCH 01/10] CNDB-15554: Bump jvector version --- build.xml | 2 +- .../index/sai/disk/vector/CassandraDiskAnn.java | 10 +++++----- .../index/sai/disk/vector/CloseableReranker.java | 8 ++++---- .../sai/disk/vector/BruteForceRowIdIteratorTest.java | 12 +++++++++--- 4 files changed, 19 insertions(+), 13 deletions(-) diff --git a/build.xml b/build.xml index 125fa472c2af..2b7906e580b5 100644 --- a/build.xml +++ b/build.xml @@ -754,7 +754,7 @@ - + diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraDiskAnn.java b/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraDiskAnn.java index 92544f07e4bc..2e1e058e6c91 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraDiskAnn.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraDiskAnn.java @@ -27,7 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.github.jbellis.jvector.graph.GraphIndex; +import io.github.jbellis.jvector.graph.ImmutableGraphIndex; import io.github.jbellis.jvector.graph.GraphSearcher; import io.github.jbellis.jvector.graph.disk.feature.FeatureId; import io.github.jbellis.jvector.graph.disk.OnDiskGraphIndex; @@ -72,7 +72,7 @@ public class CassandraDiskAnn private final FileHandle graphHandle; private final OnDiskOrdinalsMap ordinalsMap; private final Set features; - private final GraphIndex graph; + private final ImmutableGraphIndex graph; private final VectorSimilarityFunction similarityFunction; @Nullable private final CompressedVectors compressedVectors; @@ -231,7 +231,7 @@ public CloseableIterator search(VectorFloat queryVector, searcher.usePruning(usePruning); try { - var view = (GraphIndex.ScoringView) searcher.getView(); + var view = (ImmutableGraphIndex.ScoringView) searcher.getView(); SearchScoreProvider ssp; // FusedADC can no longer be written due to jvector upgrade. However, it's possible these index files // still exist, so we have to support them. @@ -311,9 +311,9 @@ public OrdinalsView getOrdinalsView() return ordinalsMap.getOrdinalsView(); } - public GraphIndex.ScoringView getView() + public ImmutableGraphIndex.ScoringView getView() { - return (GraphIndex.ScoringView) graph.getView(); + return (ImmutableGraphIndex.ScoringView) graph.getView(); } public boolean containsUnitVectors() diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/CloseableReranker.java b/src/java/org/apache/cassandra/index/sai/disk/vector/CloseableReranker.java index b85b33c81b55..0415b9eb4bcc 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/CloseableReranker.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/CloseableReranker.java @@ -20,21 +20,21 @@ import java.io.Closeable; -import io.github.jbellis.jvector.graph.GraphIndex; +import io.github.jbellis.jvector.graph.ImmutableGraphIndex; import io.github.jbellis.jvector.graph.similarity.ScoreFunction; import io.github.jbellis.jvector.vector.VectorSimilarityFunction; import io.github.jbellis.jvector.vector.types.VectorFloat; import org.apache.cassandra.io.util.FileUtils; /** - * An ExactScoreFunction that closes the underlying {@link GraphIndex.ScoringView} when closed. + * An ExactScoreFunction that closes the underlying {@link ImmutableGraphIndex.ScoringView} when closed. */ public class CloseableReranker implements ScoreFunction.ExactScoreFunction, Closeable { - private final GraphIndex.ScoringView view; + private final ImmutableGraphIndex.ScoringView view; private final ExactScoreFunction scoreFunction; - public CloseableReranker(VectorSimilarityFunction similarityFunction, VectorFloat queryVector, GraphIndex.ScoringView view) + public CloseableReranker(VectorSimilarityFunction similarityFunction, VectorFloat queryVector, ImmutableGraphIndex.ScoringView view) { this.view = view; this.scoreFunction = view.rerankerFor(queryVector, similarityFunction); diff --git a/test/unit/org/apache/cassandra/index/sai/disk/vector/BruteForceRowIdIteratorTest.java b/test/unit/org/apache/cassandra/index/sai/disk/vector/BruteForceRowIdIteratorTest.java index 22658da82ac7..bdd3e0fd3958 100644 --- a/test/unit/org/apache/cassandra/index/sai/disk/vector/BruteForceRowIdIteratorTest.java +++ b/test/unit/org/apache/cassandra/index/sai/disk/vector/BruteForceRowIdIteratorTest.java @@ -22,7 +22,7 @@ import org.junit.Test; -import io.github.jbellis.jvector.graph.GraphIndex; +import io.github.jbellis.jvector.graph.ImmutableGraphIndex; import io.github.jbellis.jvector.graph.NodeQueue; import io.github.jbellis.jvector.graph.NodesIterator; import io.github.jbellis.jvector.graph.similarity.ScoreFunction; @@ -63,7 +63,7 @@ public void testBruteForceRowIdIteratorForEmptyPQAndTopKEqualsLimit() assertTrue(view.isClosed); } - private static class TestView implements GraphIndex.ScoringView + private static class TestView implements ImmutableGraphIndex.ScoringView { private boolean isClosed = false; @@ -102,7 +102,7 @@ public int size() } @Override - public GraphIndex.NodeAtLevel entryNode() + public ImmutableGraphIndex.NodeAtLevel entryNode() { throw new UnsupportedOperationException(); } @@ -112,5 +112,11 @@ public Bits liveNodes() { throw new UnsupportedOperationException(); } + + @Override + public boolean contains(int i, int i1) + { + return false; + } } } From 303b56cd82e05bbeb337eeac76300307cf8e189e Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Fri, 3 Oct 2025 14:46:55 -0500 Subject: [PATCH 02/10] Work in progress for fused adc, fails when querying compacted graph --- build.xml | 2 +- .../config/CassandraRelevantProperties.java | 2 ++ .../index/sai/disk/format/Version.java | 5 ++- .../index/sai/disk/v7/V7OnDiskFormat.java | 8 ++--- .../index/sai/disk/v8/V8OnDiskFormat.java | 32 +++++++++++++++++++ .../sai/disk/vector/CassandraDiskAnn.java | 6 ++-- .../sai/disk/vector/CassandraOnHeapGraph.java | 4 ++- .../sai/disk/vector/CompactionGraph.java | 30 +++++++++++++---- .../index/sai/cql/VectorSiftSmallTest.java | 3 +- .../vector/BruteForceRowIdIteratorTest.java | 7 ++++ 10 files changed, 80 insertions(+), 19 deletions(-) create mode 100644 src/java/org/apache/cassandra/index/sai/disk/v8/V8OnDiskFormat.java diff --git a/build.xml b/build.xml index 28152182a84c..176abf2fdec1 100644 --- a/build.xml +++ b/build.xml @@ -741,7 +741,7 @@ - + diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index 436f8448332c..aeba091030e2 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -406,6 +406,8 @@ public enum CassandraRelevantProperties SAI_VECTOR_FLUSH_THRESHOLD_MAX_ROWS("cassandra.sai.vector_flush_threshold_max_rows", "-1"), // Use non-positive value to disable it. Period in millis to trigger a flush for SAI vector memtable index. SAI_VECTOR_FLUSH_PERIOD_IN_MILLIS("cassandra.sai.vector_flush_period_in_millis", "-1"), + // Whether compaction should build vector indexes using fused adc + SAI_VECTOR_ENABLE_FUSED_PQ("cassandra.sai.vector.enable_fused_pq", "true"), /** * Whether to disable auto-compaction */ diff --git a/src/java/org/apache/cassandra/index/sai/disk/format/Version.java b/src/java/org/apache/cassandra/index/sai/disk/format/Version.java index 1017cb7a18c7..4cfc978a893c 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/format/Version.java +++ b/src/java/org/apache/cassandra/index/sai/disk/format/Version.java @@ -35,6 +35,7 @@ import org.apache.cassandra.index.sai.disk.v5.V5OnDiskFormat; import org.apache.cassandra.index.sai.disk.v6.V6OnDiskFormat; import org.apache.cassandra.index.sai.disk.v7.V7OnDiskFormat; +import org.apache.cassandra.index.sai.disk.v8.V8OnDiskFormat; import org.apache.cassandra.index.sai.utils.TypeUtil; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.schema.SchemaConstants; @@ -69,10 +70,12 @@ public class Version implements Comparable public static final Version EC = new Version("ec", V7OnDiskFormat.instance, (c, i, g) -> stargazerFileNameFormat(c, i, g, "ec")); // total terms count serialization in index metadata, enables ANN_USE_SYNTHETIC_SCORE by default public static final Version ED = new Version("ed", V7OnDiskFormat.instance, (c, i, g) -> stargazerFileNameFormat(c, i, g, "ed")); + // jvector file format version 6 (skipped 5) + public static final Version FA = new Version("fa", V8OnDiskFormat.instance, (c, i, g) -> stargazerFileNameFormat(c, i, g, "fa")); // These are in reverse-chronological order so that the latest version is first. Version matching tests // are more likely to match the latest version, so we want to test that one first. - public static final List ALL = Lists.newArrayList(ED, EC, EB, DC, DB, CA, BA, AA); + public static final List ALL = Lists.newArrayList(FA, ED, EC, EB, DC, DB, CA, BA, AA); public static final Version EARLIEST = AA; public static final Version VECTOR_EARLIEST = BA; diff --git a/src/java/org/apache/cassandra/index/sai/disk/v7/V7OnDiskFormat.java b/src/java/org/apache/cassandra/index/sai/disk/v7/V7OnDiskFormat.java index 95b2fa80dc42..fd19af2356f4 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v7/V7OnDiskFormat.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v7/V7OnDiskFormat.java @@ -79,10 +79,10 @@ public boolean validateIndexComponent(IndexComponent.ForRead component, boolean // which does not check the checksum. (The issue is in the way the checksum was computed. It didn't // include the header/footer bytes, and for multi-segment builds, it didn't include the bytes from // all previous segments, which is the design for all index components to date.) - if (!checksum || component.componentType() == IndexComponentType.TERMS_DATA) - SAICodecUtils.validate(input, getExpectedEarliestVersion(context, component.componentType())); - else - SAICodecUtils.validateChecksum(input); +// if (!checksum || component.componentType() == IndexComponentType.TERMS_DATA) +// SAICodecUtils.validate(input, getExpectedEarliestVersion(context, component.componentType())); +// else +// SAICodecUtils.validateChecksum(input); return true; } catch (Throwable e) diff --git a/src/java/org/apache/cassandra/index/sai/disk/v8/V8OnDiskFormat.java b/src/java/org/apache/cassandra/index/sai/disk/v8/V8OnDiskFormat.java new file mode 100644 index 000000000000..aca3d530ce97 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sai/disk/v8/V8OnDiskFormat.java @@ -0,0 +1,32 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.cassandra.index.sai.disk.v8; + +import org.apache.cassandra.index.sai.disk.v7.V7OnDiskFormat; + +public class V8OnDiskFormat extends V7OnDiskFormat +{ + public static final V8OnDiskFormat instance = new V8OnDiskFormat(); + + @Override + public int jvectorFileFormatVersion() + { + return 6; + } +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraDiskAnn.java b/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraDiskAnn.java index 92544f07e4bc..a97977f30178 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraDiskAnn.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraDiskAnn.java @@ -117,7 +117,7 @@ public CassandraDiskAnn(SSTableContext sstableContext, SegmentMetadata.Component } VectorCompression.CompressionType compressionType = VectorCompression.CompressionType.values()[reader.readByte()]; - if (features.contains(FeatureId.FUSED_ADC)) + if (features.contains(FeatureId.FUSED_PQ)) { assert compressionType == VectorCompression.CompressionType.PRODUCT_QUANTIZATION; compressedVectors = null; @@ -233,9 +233,7 @@ public CloseableIterator search(VectorFloat queryVector, { var view = (GraphIndex.ScoringView) searcher.getView(); SearchScoreProvider ssp; - // FusedADC can no longer be written due to jvector upgrade. However, it's possible these index files - // still exist, so we have to support them. - if (features.contains(FeatureId.FUSED_ADC)) + if (features.contains(FeatureId.FUSED_PQ)) { var asf = view.approximateScoreFunctionFor(queryVector, similarityFunction); var rr = isRerankless ? null : view.rerankerFor(queryVector, similarityFunction); diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java b/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java index c42178d8aec1..1b297926118f 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java @@ -96,6 +96,8 @@ import org.apache.cassandra.utils.CloseableIterator; import org.apache.lucene.util.StringHelper; +import static io.github.jbellis.jvector.graph.disk.OnDiskSequentialGraphIndexWriter.FOOTER_MAGIC; + public class CassandraOnHeapGraph implements Accountable { // Cassandra's PQ features, independent of JVector's @@ -478,7 +480,7 @@ public SegmentMetadata.ComponentMetadataMap flush(IndexComponents.ForWrite perIn var start = System.nanoTime(); var suppliers = Feature.singleStateFactory(FeatureId.INLINE_VECTORS, nodeId -> new InlineVectors.State(vectorValues.getVector(nodeId))); indexWriter.write(suppliers); - SAICodecUtils.writeFooter(indexWriter.getOutput(), indexWriter.checksum()); +// SAICodecUtils.writeFooter(indexWriter.getOutput(), indexWriter.checksum()); logger.info("Writing graph took {}ms", (System.nanoTime() - start) / 1_000_000); long termsLength = indexWriter.getOutput().position() - termsOffset; diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java b/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java index 2443e272340c..612a192781b5 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java @@ -41,6 +41,7 @@ import io.github.jbellis.jvector.graph.ListRandomAccessVectorValues; import io.github.jbellis.jvector.graph.disk.feature.Feature; import io.github.jbellis.jvector.graph.disk.feature.FeatureId; +import io.github.jbellis.jvector.graph.disk.feature.FusedPQ; import io.github.jbellis.jvector.graph.disk.feature.InlineVectors; import io.github.jbellis.jvector.graph.disk.OnDiskGraphIndex; import io.github.jbellis.jvector.graph.disk.OnDiskGraphIndexWriter; @@ -67,6 +68,7 @@ import net.openhft.chronicle.map.ChronicleMapBuilder; import org.agrona.collections.Int2ObjectHashMap; import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.marshal.VectorType; import org.apache.cassandra.exceptions.InvalidRequestException; @@ -88,6 +90,7 @@ import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.service.StorageService; +import static io.github.jbellis.jvector.graph.disk.OnDiskSequentialGraphIndexWriter.FOOTER_MAGIC; import static java.lang.Math.max; import static java.lang.Math.min; @@ -109,6 +112,8 @@ public class CompactionGraph implements Closeable, Accountable @VisibleForTesting public static int PQ_TRAINING_SIZE = ProductQuantization.MAX_PQ_TRAINING_SET_SIZE; + private static boolean ENABLE_FUSED_PQ = CassandraRelevantProperties.SAI_VECTOR_ENABLE_FUSED_PQ.getBoolean(); + private final VectorType.VectorSerializer serializer; private final VectorSimilarityFunction similarityFunction; private final ChronicleMap, CompactionVectorPostings> postingsMap; @@ -225,12 +230,14 @@ else if (compressor instanceof BinaryQuantization) private OnDiskGraphIndexWriter createTermsWriter(OrdinalMapper ordinalMapper) throws IOException { - return new OnDiskGraphIndexWriter.Builder(builder.getGraph(), termsFile.toPath()) + var writerBuilder = new OnDiskGraphIndexWriter.Builder(builder.getGraph(), termsFile.toPath()) .withStartOffset(termsOffset) .with(new InlineVectors(dimension)) .withVersion(Version.current().onDiskFormat().jvectorFileFormatVersion()) - .withMapper(ordinalMapper) - .build(); + .withMapper(ordinalMapper); + if (ENABLE_FUSED_PQ) + writerBuilder.with(new FusedPQ(context.getIndexWriterConfig().getAnnMaxDegree(), (ProductQuantization) compressor)); + return writerBuilder.build(); } @Override @@ -453,10 +460,19 @@ public SegmentMetadata.ComponentMetadataMap flush() throws IOException // write the graph edge lists and optionally fused adc features var start = System.nanoTime(); - // Required becuase jvector 3 wrote the fused adc map here. We no longer write jvector 3, but we still - // write out the empty map. - writer.write(Map.of()); - SAICodecUtils.writeFooter(writer.getOutput(), writer.checksum()); + if (writer.getFeatureSet().contains(FeatureId.FUSED_PQ)) + { + try (var view = builder.getGraph().getView()) + { + var supplier = Feature.singleStateFactory(FeatureId.FUSED_PQ, ordinal -> new FusedPQ.State(view, (PQVectors) compressedVectors, ordinal)); + writer.write(supplier); + } + } + else + { + writer.write(Map.of()); + } +// SAICodecUtils.writeFooter(writer.getOutput(), writer.checksum()); logger.info("Writing graph took {}ms", (System.nanoTime() - start) / 1_000_000); long termsLength = writer.getOutput().position() - termsOffset; diff --git a/test/unit/org/apache/cassandra/index/sai/cql/VectorSiftSmallTest.java b/test/unit/org/apache/cassandra/index/sai/cql/VectorSiftSmallTest.java index 8fc9b37c17e8..c2da8f610239 100644 --- a/test/unit/org/apache/cassandra/index/sai/cql/VectorSiftSmallTest.java +++ b/test/unit/org/apache/cassandra/index/sai/cql/VectorSiftSmallTest.java @@ -156,6 +156,7 @@ public void testCompaction() throws Throwable assertTrue("Pre-compaction recall is " + recall, recall > 0.975); } + compact(); compact(); for (int topK : List.of(1, 100)) { @@ -313,7 +314,7 @@ private void createTable() private void createIndex() { // we need a long timeout because we are adding many vectors - String index = createIndexAsync("CREATE CUSTOM INDEX ON %s(val) USING 'StorageAttachedIndex' WITH OPTIONS = {'similarity_function' : 'euclidean'}"); + String index = createIndexAsync("CREATE CUSTOM INDEX ON %s(val) USING 'StorageAttachedIndex' WITH OPTIONS = {'similarity_function' : 'euclidean', 'enable_hierarchy': 'true'}"); waitForIndexQueryable(KEYSPACE, index, 5, TimeUnit.MINUTES); } diff --git a/test/unit/org/apache/cassandra/index/sai/disk/vector/BruteForceRowIdIteratorTest.java b/test/unit/org/apache/cassandra/index/sai/disk/vector/BruteForceRowIdIteratorTest.java index 22658da82ac7..6086d0aadae1 100644 --- a/test/unit/org/apache/cassandra/index/sai/disk/vector/BruteForceRowIdIteratorTest.java +++ b/test/unit/org/apache/cassandra/index/sai/disk/vector/BruteForceRowIdIteratorTest.java @@ -19,6 +19,7 @@ package org.apache.cassandra.index.sai.disk.vector; import java.util.NoSuchElementException; +import java.util.function.Function; import org.junit.Test; @@ -95,6 +96,12 @@ public NodesIterator getNeighborsIterator(int i, int i1) throw new UnsupportedOperationException(); } + @Override + public void processNeighbors(int i, int i1, ScoreFunction scoreFunction, Function function, GraphIndex.NeighborProcessor neighborProcessor) + { + + } + @Override public int size() { From 05b214834c5a90456f963624e8e96b12b8fe5afe Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Mon, 6 Oct 2025 11:50:29 -0500 Subject: [PATCH 03/10] Skip writing fused graph if version to early --- .../config/CassandraRelevantProperties.java | 2 +- .../index/sai/disk/vector/CompactionGraph.java | 17 +++++++++++++---- .../index/sai/cql/VectorSiftSmallTest.java | 2 +- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index aeba091030e2..059ea0728596 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -407,7 +407,7 @@ public enum CassandraRelevantProperties // Use non-positive value to disable it. Period in millis to trigger a flush for SAI vector memtable index. SAI_VECTOR_FLUSH_PERIOD_IN_MILLIS("cassandra.sai.vector_flush_period_in_millis", "-1"), // Whether compaction should build vector indexes using fused adc - SAI_VECTOR_ENABLE_FUSED_PQ("cassandra.sai.vector.enable_fused_pq", "true"), + SAI_VECTOR_ENABLE_FUSED("cassandra.sai.vector.enable_fused", "true"), /** * Whether to disable auto-compaction */ diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java b/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java index 612a192781b5..6c940aa0d613 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java @@ -90,7 +90,6 @@ import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.service.StorageService; -import static io.github.jbellis.jvector.graph.disk.OnDiskSequentialGraphIndexWriter.FOOTER_MAGIC; import static java.lang.Math.max; import static java.lang.Math.min; @@ -112,7 +111,7 @@ public class CompactionGraph implements Closeable, Accountable @VisibleForTesting public static int PQ_TRAINING_SIZE = ProductQuantization.MAX_PQ_TRAINING_SET_SIZE; - private static boolean ENABLE_FUSED_PQ = CassandraRelevantProperties.SAI_VECTOR_ENABLE_FUSED_PQ.getBoolean(); + private static boolean ENABLE_FUSED = CassandraRelevantProperties.SAI_VECTOR_ENABLE_FUSED.getBoolean(); private final VectorType.VectorSerializer serializer; private final VectorSimilarityFunction similarityFunction; @@ -235,8 +234,18 @@ private OnDiskGraphIndexWriter createTermsWriter(OrdinalMapper ordinalMapper) th .with(new InlineVectors(dimension)) .withVersion(Version.current().onDiskFormat().jvectorFileFormatVersion()) .withMapper(ordinalMapper); - if (ENABLE_FUSED_PQ) - writerBuilder.with(new FusedPQ(context.getIndexWriterConfig().getAnnMaxDegree(), (ProductQuantization) compressor)); + if (ENABLE_FUSED) + { + if (Version.current().onDiskFormat().jvectorFileFormatVersion() >= 6) + { + assert compressor instanceof ProductQuantization; // todo revisit this + writerBuilder.with(new FusedPQ(context.getIndexWriterConfig().getAnnMaxDegree(), (ProductQuantization) compressor)); + } + else + { + logger.warn("Fused ADC enabled, but will not be used because on disk version is {}.", Version.current()); + } + } return writerBuilder.build(); } diff --git a/test/unit/org/apache/cassandra/index/sai/cql/VectorSiftSmallTest.java b/test/unit/org/apache/cassandra/index/sai/cql/VectorSiftSmallTest.java index c2da8f610239..748a6f29c364 100644 --- a/test/unit/org/apache/cassandra/index/sai/cql/VectorSiftSmallTest.java +++ b/test/unit/org/apache/cassandra/index/sai/cql/VectorSiftSmallTest.java @@ -43,7 +43,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -public class VectorSiftSmallTest extends VectorTester +public class VectorSiftSmallTest extends VectorTester.Versioned { private static final String DATASET = "siftsmall"; // change to "sift" for larger dataset. requires manual download From 903b76b70ff77467f34f98ad1bb3f8948db197e0 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Wed, 8 Oct 2025 16:43:05 -0500 Subject: [PATCH 04/10] Comment out writeHeader to fix build --- src/java/org/apache/cassandra/index/sai/IndexContext.java | 1 - .../cassandra/index/sai/disk/vector/CompactionGraph.java | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/IndexContext.java b/src/java/org/apache/cassandra/index/sai/IndexContext.java index f6eb4a974f94..020a027b50d2 100644 --- a/src/java/org/apache/cassandra/index/sai/IndexContext.java +++ b/src/java/org/apache/cassandra/index/sai/IndexContext.java @@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory; import io.github.jbellis.jvector.vector.VectorSimilarityFunction; -import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.cql3.Operator; import org.apache.cassandra.cql3.statements.schema.IndexTarget; import org.apache.cassandra.db.ClusteringComparator; diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java b/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java index 6c940aa0d613..c666d5cc526b 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java @@ -388,7 +388,8 @@ public long addGraphNode(InsertionResult result) public SegmentMetadata.ComponentMetadataMap flush() throws IOException { // header is required to write the postings, but we need to recreate the writer after that with an accurate OrdinalMapper - writer.writeHeader(); + // TODO what to do about this? Removing this line makes the tests pass, but with it, we get an error when loading it +// writer.writeHeader(); writer.close(); int nInProgress = builder.insertsInProgress(); From 3afcb241e8098b17d3f008abc53d5d785bf332f2 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Thu, 9 Oct 2025 14:37:51 -0500 Subject: [PATCH 05/10] Add writeHeader back in to fix failing compaction test --- .../apache/cassandra/index/sai/disk/vector/CompactionGraph.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java b/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java index c666d5cc526b..67ae4631d1dd 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java @@ -389,7 +389,7 @@ public SegmentMetadata.ComponentMetadataMap flush() throws IOException { // header is required to write the postings, but we need to recreate the writer after that with an accurate OrdinalMapper // TODO what to do about this? Removing this line makes the tests pass, but with it, we get an error when loading it -// writer.writeHeader(); + writer.writeHeader(builder.getGraph().getView()); writer.close(); int nInProgress = builder.insertsInProgress(); From 6859092980acb6d452d2afc07d4966fd98a198b3 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Thu, 9 Oct 2025 16:57:00 -0500 Subject: [PATCH 06/10] CNDB-15640: Determine if vectors are unit length at insert --- .../sai/disk/vector/CassandraOnHeapGraph.java | 17 +++++--- .../sai/disk/vector/VectorSourceModel.java | 42 +++++++++++++------ 2 files changed, 40 insertions(+), 19 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java b/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java index c42178d8aec1..9a1a522fe00b 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java @@ -125,6 +125,7 @@ public enum PQVersion { private final InvalidVectorBehavior invalidVectorBehavior; private final IntHashSet deletedOrdinals; private volatile boolean hasDeletions; + private volatile boolean unitVectors; // we don't need to explicitly close these since only on-heap resources are involved private final ThreadLocal searchers; @@ -157,6 +158,9 @@ public CassandraOnHeapGraph(IndexContext context, boolean forSearching, Memtable vectorsByKey = forSearching ? new NonBlockingHashMap<>() : null; invalidVectorBehavior = forSearching ? InvalidVectorBehavior.FAIL : InvalidVectorBehavior.IGNORE; + // We start by assuming the vectors are unit vectors and then if they are not, we will correct it. + unitVectors = true; + int jvectorVersion = Version.current().onDiskFormat().jvectorFileFormatVersion(); // This is only a warning since it's not a fatal error to write without hierarchy if (indexConfig.isHierarchyEnabled() && jvectorVersion < 4) @@ -269,6 +273,12 @@ public long add(ByteBuffer term, T key) var success = postingsByOrdinal.compareAndPut(ordinal, null, postings); assert success : "postingsByOrdinal already contains an entry for ordinal " + ordinal; bytesUsed += builder.addGraphNode(ordinal, vector); + + // We safely added to the graph, check if we need to check for unit length + if (sourceModel.hasKnownUnitLengthVectors() || unitVectors) + if (!(Math.abs(VectorUtil.dotProduct(vector, vector) - 1.0f) < 0.01)) + unitVectors = false; + return bytesUsed; } else @@ -580,15 +590,10 @@ private long writePQ(SequentialWriter writer, V5VectorPostingsWriter.RemappedPos // encode (compress) the vectors to save if (compressor != null) cv = compressor.encodeAll(new RemappedVectorValues(remapped, remapped.maxNewOrdinal, vectorValues)); - - containsUnitVectors = IntStream.range(0, vectorValues.size()) - .parallel() - .mapToObj(vectorValues::getVector) - .allMatch(v -> Math.abs(VectorUtil.dotProduct(v, v) - 1.0f) < 0.01); } var actualType = compressor == null ? CompressionType.NONE : preferredCompression.type; - writePqHeader(writer, containsUnitVectors, actualType); + writePqHeader(writer, unitVectors, actualType); if (actualType == CompressionType.NONE) return writer.position(); diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/VectorSourceModel.java b/src/java/org/apache/cassandra/index/sai/disk/vector/VectorSourceModel.java index bf08896591dd..bccfd8e1352a 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/VectorSourceModel.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/VectorSourceModel.java @@ -31,18 +31,19 @@ import static org.apache.cassandra.index.sai.disk.vector.VectorCompression.CompressionType.BINARY_QUANTIZATION; import static org.apache.cassandra.index.sai.disk.vector.VectorCompression.CompressionType.NONE; import static org.apache.cassandra.index.sai.disk.vector.VectorCompression.CompressionType.PRODUCT_QUANTIZATION; - public enum VectorSourceModel { - ADA002((dimension) -> new VectorCompression(PRODUCT_QUANTIZATION, dimension, 0.125), 1.25), - OPENAI_V3_SMALL((dimension) -> new VectorCompression(PRODUCT_QUANTIZATION, dimension, 0.0625), 1.5), - OPENAI_V3_LARGE((dimension) -> new VectorCompression(PRODUCT_QUANTIZATION, dimension, 0.0625), 1.25), - BERT(COSINE, (dimension) -> new VectorCompression(PRODUCT_QUANTIZATION, dimension, 0.25), __ -> 1.0), - GECKO((dimension) -> new VectorCompression(PRODUCT_QUANTIZATION, dimension, 0.125), 1.25), - NV_QA_4((dimension) -> new VectorCompression(PRODUCT_QUANTIZATION, dimension, 0.125), 1.25), - COHERE_V3((dimension) -> new VectorCompression(PRODUCT_QUANTIZATION, dimension, 0.0625), 1.25), - - OTHER(COSINE, VectorSourceModel::genericCompressionFor, VectorSourceModel::genericOverquery); + ADA002((dimension) -> new VectorCompression(PRODUCT_QUANTIZATION, dimension, 0.125), 1.25, true), + OPENAI_V3_SMALL((dimension) -> new VectorCompression(PRODUCT_QUANTIZATION, dimension, 0.0625), 1.5, true), + OPENAI_V3_LARGE((dimension) -> new VectorCompression(PRODUCT_QUANTIZATION, dimension, 0.0625), 1.25, true), + // BERT is not known to have unit length vectors in all cases + BERT(COSINE, (dimension) -> new VectorCompression(PRODUCT_QUANTIZATION, dimension, 0.25), __ -> 1.0, false), + GECKO((dimension) -> new VectorCompression(PRODUCT_QUANTIZATION, dimension, 0.125), 1.25, true), + NV_QA_4((dimension) -> new VectorCompression(PRODUCT_QUANTIZATION, dimension, 0.125), 1.25, false), + // Cohere does not officially say they have unit length vectors, but some users report that they do + COHERE_V3((dimension) -> new VectorCompression(PRODUCT_QUANTIZATION, dimension, 0.0625), 1.25, false), + + OTHER(COSINE, VectorSourceModel::genericCompressionFor, VectorSourceModel::genericOverquery, false); /** * Default similarity function for this model. @@ -58,18 +59,33 @@ public enum VectorSourceModel */ public final Function overqueryProvider; - VectorSourceModel(Function compressionProvider, double overqueryFactor) + /** + * Indicates that the model is known to have unit length vectors. When false, the runtime checks per graph + * until a non-unit length vector is found. + */ + private final boolean knownUnitLength; + + VectorSourceModel(Function compressionProvider, + double overqueryFactor, + boolean knownUnitLength) { - this(DOT_PRODUCT, compressionProvider, __ -> overqueryFactor); + this(DOT_PRODUCT, compressionProvider, __ -> overqueryFactor, knownUnitLength); } VectorSourceModel(VectorSimilarityFunction defaultSimilarityFunction, Function compressionProvider, - Function overqueryProvider) + Function overqueryProvider, + boolean knownUnitLength) { this.defaultSimilarityFunction = defaultSimilarityFunction; this.compressionProvider = compressionProvider; this.overqueryProvider = overqueryProvider; + this.knownUnitLength = knownUnitLength; + } + + public boolean hasKnownUnitLengthVectors() + { + return knownUnitLength; } public static VectorSourceModel fromString(String value) From db9cfab79e7fa72eee53abeaf42270a7c07fc023 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Fri, 10 Oct 2025 13:57:10 -0500 Subject: [PATCH 07/10] Remove unused variable --- .../cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java b/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java index 9a1a522fe00b..37d34d69af15 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java @@ -570,7 +570,6 @@ private long writePQ(SequentialWriter writer, V5VectorPostingsWriter.RemappedPos // Build encoder and compress vectors VectorCompressor compressor; // will be null if we can't compress CompressedVectors cv = null; - boolean containsUnitVectors; // limit the PQ computation and encoding to one index at a time -- goal during flush is to // evict from memory ASAP so better to do the PQ build (in parallel) one at a time synchronized (CassandraOnHeapGraph.class) From 51c33265759ac766602246df3d19d41ec7d47cd3 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Wed, 15 Oct 2025 12:10:22 -0500 Subject: [PATCH 08/10] Save progress on COHG (doesn't work yet) --- .../sai/disk/vector/CassandraOnHeapGraph.java | 91 ++++++++++++------- .../sai/disk/vector/CompactionGraph.java | 16 +--- 2 files changed, 63 insertions(+), 44 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java b/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java index 814e70ad6d33..22a9c5afa5ea 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java @@ -25,15 +25,16 @@ import java.util.Arrays; import java.util.Collection; import java.util.Comparator; +import java.util.EnumMap; import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import java.util.function.IntFunction; import java.util.function.IntUnaryOperator; import java.util.function.ToIntFunction; -import java.util.stream.IntStream; import com.google.common.annotations.VisibleForTesting; import org.cliffc.high_scale_lib.NonBlockingHashMap; @@ -42,17 +43,20 @@ import io.github.jbellis.jvector.graph.GraphIndexBuilder; import io.github.jbellis.jvector.graph.GraphSearcher; +import io.github.jbellis.jvector.graph.ImmutableGraphIndex; import io.github.jbellis.jvector.graph.RandomAccessVectorValues; import io.github.jbellis.jvector.graph.SearchResult; import io.github.jbellis.jvector.graph.disk.OnDiskGraphIndexWriter; import io.github.jbellis.jvector.graph.disk.OrdinalMapper; import io.github.jbellis.jvector.graph.disk.feature.Feature; import io.github.jbellis.jvector.graph.disk.feature.FeatureId; +import io.github.jbellis.jvector.graph.disk.feature.FusedPQ; import io.github.jbellis.jvector.graph.disk.feature.InlineVectors; import io.github.jbellis.jvector.graph.similarity.DefaultSearchScoreProvider; -import io.github.jbellis.jvector.graph.similarity.SearchScoreProvider; import io.github.jbellis.jvector.quantization.BinaryQuantization; import io.github.jbellis.jvector.quantization.CompressedVectors; +import io.github.jbellis.jvector.quantization.ImmutablePQVectors; +import io.github.jbellis.jvector.quantization.PQVectors; import io.github.jbellis.jvector.quantization.ProductQuantization; import io.github.jbellis.jvector.quantization.VectorCompressor; import io.github.jbellis.jvector.util.Accountable; @@ -66,6 +70,7 @@ import io.github.jbellis.jvector.vector.types.VectorFloat; import io.github.jbellis.jvector.vector.types.VectorTypeSupport; import org.agrona.collections.IntHashSet; +import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.db.compaction.CompactionSSTable; import org.apache.cassandra.db.marshal.VectorType; import org.apache.cassandra.db.memtable.Memtable; @@ -81,7 +86,6 @@ import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata; import org.apache.cassandra.index.sai.disk.v2.V2VectorIndexSearcher; import org.apache.cassandra.index.sai.disk.v2.V2VectorPostingsWriter; -import org.apache.cassandra.index.sai.disk.v3.V3OnDiskFormat; import org.apache.cassandra.index.sai.disk.v5.V5OnDiskFormat; import org.apache.cassandra.index.sai.disk.v5.V5VectorPostingsWriter; import org.apache.cassandra.index.sai.disk.v5.V5VectorPostingsWriter.Structure; @@ -89,6 +93,7 @@ import org.apache.cassandra.index.sai.metrics.ColumnQueryMetrics; import org.apache.cassandra.index.sai.utils.PrimaryKeyWithSortKey; import org.apache.cassandra.index.sai.utils.SAICodecUtils; +import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.SequentialWriter; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.tracing.Tracing; @@ -96,8 +101,6 @@ import org.apache.cassandra.utils.CloseableIterator; import org.apache.lucene.util.StringHelper; -import static io.github.jbellis.jvector.graph.disk.OnDiskSequentialGraphIndexWriter.FOOTER_MAGIC; - public class CassandraOnHeapGraph implements Accountable { // Cassandra's PQ features, independent of JVector's @@ -106,6 +109,9 @@ public enum PQVersion { V1, // includes unit vector calculation } + /** whether to use fused ADC when writing indexes (assuming all other conditions are met) */ + private static boolean ENABLE_FUSED = CassandraRelevantProperties.SAI_VECTOR_ENABLE_FUSED.getBoolean(); + /** minimum number of rows to perform PQ codebook generation */ public static final int MIN_PQ_ROWS = 1024; @@ -450,24 +456,15 @@ public SegmentMetadata.ComponentMetadataMap flush(IndexComponents.ForWrite perIn if (indexFile.exists()) termsOffset += indexFile.length(); try (var pqOutput = perIndexComponents.addOrGet(IndexComponentType.PQ).openOutput(true); - var postingsOutput = perIndexComponents.addOrGet(IndexComponentType.POSTING_LISTS).openOutput(true); - var indexWriter = new OnDiskGraphIndexWriter.Builder(builder.getGraph(), indexFile.toPath()) - .withStartOffset(termsOffset) - .withVersion(Version.current().onDiskFormat().jvectorFileFormatVersion()) - .withMapper(ordinalMapper) - .with(new InlineVectors(vectorValues.dimension())) - .build()) + var postingsOutput = perIndexComponents.addOrGet(IndexComponentType.POSTING_LISTS).openOutput(true)) { SAICodecUtils.writeHeader(pqOutput); SAICodecUtils.writeHeader(postingsOutput); - indexWriter.getOutput().seek(indexFile.length()); // position at the end of the previous segment before writing our own header - SAICodecUtils.writeHeader(SAICodecUtils.toLuceneOutput(indexWriter.getOutput())); - assert indexWriter.getOutput().position() == termsOffset : "termsOffset " + termsOffset + " != " + indexWriter.getOutput().position(); // compute and write PQ long pqOffset = pqOutput.getFilePointer(); - long pqPosition = writePQ(pqOutput.asSequentialWriter(), remappedPostings, perIndexComponents.context()); - long pqLength = pqPosition - pqOffset; + CompressedVectors cv = writePQ(pqOutput.asSequentialWriter(), remappedPostings, perIndexComponents.context()); + long pqLength = pqOutput.asSequentialWriter().position() - pqOffset; // write postings long postingsOffset = postingsOutput.getFilePointer(); @@ -486,23 +483,46 @@ public SegmentMetadata.ComponentMetadataMap flush(IndexComponents.ForWrite perIn } long postingsLength = postingsPosition - postingsOffset; - // write the graph - var start = System.nanoTime(); - var suppliers = Feature.singleStateFactory(FeatureId.INLINE_VECTORS, nodeId -> new InlineVectors.State(vectorValues.getVector(nodeId))); - indexWriter.write(suppliers); + try (var indexWriter = createIndexWriter(indexFile, termsOffset, perIndexComponents.context(), ordinalMapper, cv); + var view = builder.getGraph().getView()) + { + indexWriter.getOutput().seek(indexFile.length()); // position at the end of the previous segment before writing our own header + SAICodecUtils.writeHeader(SAICodecUtils.toLuceneOutput(indexWriter.getOutput())); + assert indexWriter.getOutput().position() == termsOffset : "termsOffset " + termsOffset + " != " + indexWriter.getOutput().position(); + + // write the graph + var start = System.nanoTime(); + System.out.println("Writing graph"); + indexWriter.write(suppliers(view, (PQVectors) cv)); + System.out.println("Done writing graph"); // SAICodecUtils.writeFooter(indexWriter.getOutput(), indexWriter.checksum()); - logger.info("Writing graph took {}ms", (System.nanoTime() - start) / 1_000_000); - long termsLength = indexWriter.getOutput().position() - termsOffset; + logger.info("Writing graph took {}ms", (System.nanoTime() - start) / 1_000_000); + long termsLength = indexWriter.getOutput().position() - termsOffset; - // write remaining footers/checksums - SAICodecUtils.writeFooter(pqOutput); - SAICodecUtils.writeFooter(postingsOutput); + // write remaining footers/checksums + SAICodecUtils.writeFooter(pqOutput); + SAICodecUtils.writeFooter(postingsOutput); - // add components to the metadata map - return createMetadataMap(termsOffset, termsLength, postingsOffset, postingsLength, pqOffset, pqLength); + // add components to the metadata map + return createMetadataMap(termsOffset, termsLength, postingsOffset, postingsLength, pqOffset, pqLength); + } } } + private OnDiskGraphIndexWriter createIndexWriter(File indexFile, long termsOffset, IndexContext context, OrdinalMapper ordinalMapper, CompressedVectors compressor) throws IOException + { + var indexWriterBuilder = new OnDiskGraphIndexWriter.Builder(builder.getGraph(), indexFile.toPath()) + .withStartOffset(termsOffset) + .withVersion(Version.current().onDiskFormat().jvectorFileFormatVersion()) + .withMapper(ordinalMapper) + .with(new InlineVectors(vectorValues.dimension())); + + if (ENABLE_FUSED && compressor instanceof ImmutablePQVectors && Version.current().onDiskFormat().jvectorFileFormatVersion() >= 6) + indexWriterBuilder.with(new FusedPQ(context.getIndexWriterConfig().getAnnMaxDegree(), (ProductQuantization) compressor.getCompressor())); + + return indexWriterBuilder.build(); + } + static SegmentMetadata.ComponentMetadataMap createMetadataMap(long termsOffset, long termsLength, long postingsOffset, long postingsLength, long pqOffset, long pqLength) { SegmentMetadata.ComponentMetadataMap metadataMap = new SegmentMetadata.ComponentMetadataMap(); @@ -513,6 +533,15 @@ static SegmentMetadata.ComponentMetadataMap createMetadataMap(long termsOffset, return metadataMap; } + private EnumMap> suppliers(ImmutableGraphIndex.View view, PQVectors pqVectors) + { + var features = new EnumMap>(FeatureId.class); + features.put(FeatureId.INLINE_VECTORS, nodeId -> new InlineVectors.State(vectorValues.getVector(nodeId))); + if (pqVectors != null && pqVectors.getCompressor() != null && ENABLE_FUSED && Version.current().onDiskFormat().jvectorFileFormatVersion() >= 6) + features.put(FeatureId.FUSED_PQ, nodeId -> new FusedPQ.State(view, pqVectors, nodeId)); + return features; + } + /** * Return the best previous CompressedVectors for this column that matches the `matcher` predicate. * "Best" means the most recent one that hits the row count target of {@link ProductQuantization#MAX_PQ_TRAINING_SET_SIZE}, @@ -565,7 +594,7 @@ public static PqInfo getPqIfPresent(IndexContext indexContext, Function= 6) - { - assert compressor instanceof ProductQuantization; // todo revisit this - writerBuilder.with(new FusedPQ(context.getIndexWriterConfig().getAnnMaxDegree(), (ProductQuantization) compressor)); - } - else - { - logger.warn("Fused ADC enabled, but will not be used because on disk version is {}.", Version.current()); - } - } + if (ENABLE_FUSED && compressor instanceof ProductQuantization && Version.current().onDiskFormat().jvectorFileFormatVersion() >= 6) + writerBuilder.with(new FusedPQ(context.getIndexWriterConfig().getAnnMaxDegree(), (ProductQuantization) compressor)); return writerBuilder.build(); } @@ -389,7 +379,7 @@ public SegmentMetadata.ComponentMetadataMap flush() throws IOException { // header is required to write the postings, but we need to recreate the writer after that with an accurate OrdinalMapper // TODO what to do about this? Removing this line makes the tests pass, but with it, we get an error when loading it - writer.writeHeader(builder.getGraph().getView()); +// writer.writeHeader(builder.getGraph().getView()); writer.close(); int nInProgress = builder.insertsInProgress(); From 2dd5a4f9f51684db6813d9e02d2ddf5bf5da504e Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Wed, 15 Oct 2025 16:15:04 -0500 Subject: [PATCH 09/10] Save progress (works now, but need to consider refactoring) --- .../index/sai/disk/v7/V7OnDiskFormat.java | 8 ++-- .../sai/disk/vector/CassandraDiskAnn.java | 2 +- .../sai/disk/vector/CassandraOnHeapGraph.java | 45 ++++++++++++------- .../sai/disk/vector/CompactionGraph.java | 2 +- 4 files changed, 36 insertions(+), 21 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/v7/V7OnDiskFormat.java b/src/java/org/apache/cassandra/index/sai/disk/v7/V7OnDiskFormat.java index fd19af2356f4..95b2fa80dc42 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v7/V7OnDiskFormat.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v7/V7OnDiskFormat.java @@ -79,10 +79,10 @@ public boolean validateIndexComponent(IndexComponent.ForRead component, boolean // which does not check the checksum. (The issue is in the way the checksum was computed. It didn't // include the header/footer bytes, and for multi-segment builds, it didn't include the bytes from // all previous segments, which is the design for all index components to date.) -// if (!checksum || component.componentType() == IndexComponentType.TERMS_DATA) -// SAICodecUtils.validate(input, getExpectedEarliestVersion(context, component.componentType())); -// else -// SAICodecUtils.validateChecksum(input); + if (!checksum || component.componentType() == IndexComponentType.TERMS_DATA) + SAICodecUtils.validate(input, getExpectedEarliestVersion(context, component.componentType())); + else + SAICodecUtils.validateChecksum(input); return true; } catch (Throwable e) diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraDiskAnn.java b/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraDiskAnn.java index e833b1a07ccb..3f3a74c27fd9 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraDiskAnn.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraDiskAnn.java @@ -94,7 +94,7 @@ public CassandraDiskAnn(SSTableContext sstableContext, SegmentMetadata.Component SegmentMetadata.ComponentMetadata termsMetadata = this.componentMetadatas.get(IndexComponentType.TERMS_DATA); graphHandle = indexFiles.termsData(); - var rawGraph = OnDiskGraphIndex.load(graphHandle::createReader, termsMetadata.offset); + var rawGraph = OnDiskGraphIndex.load(graphHandle::createReader, termsMetadata.offset, false); features = rawGraph.getFeatureSet(); graph = rawGraph; diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java b/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java index 22a9c5afa5ea..6406ec11b9c0 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java @@ -67,6 +67,7 @@ import io.github.jbellis.jvector.vector.VectorSimilarityFunction; import io.github.jbellis.jvector.vector.VectorUtil; import io.github.jbellis.jvector.vector.VectorizationProvider; +import io.github.jbellis.jvector.vector.types.ByteSequence; import io.github.jbellis.jvector.vector.types.VectorFloat; import io.github.jbellis.jvector.vector.types.VectorTypeSupport; import org.agrona.collections.IntHashSet; @@ -461,9 +462,12 @@ public SegmentMetadata.ComponentMetadataMap flush(IndexComponents.ForWrite perIn SAICodecUtils.writeHeader(pqOutput); SAICodecUtils.writeHeader(postingsOutput); + // Write fused unless we don't meet some criteria + boolean attemptWritingFused = ENABLE_FUSED && Version.current().onDiskFormat().jvectorFileFormatVersion() >= 6; + // compute and write PQ long pqOffset = pqOutput.getFilePointer(); - CompressedVectors cv = writePQ(pqOutput.asSequentialWriter(), remappedPostings, perIndexComponents.context()); + var compressor = writePQ(pqOutput.asSequentialWriter(), remappedPostings, perIndexComponents.context(), attemptWritingFused); long pqLength = pqOutput.asSequentialWriter().position() - pqOffset; // write postings @@ -483,7 +487,7 @@ public SegmentMetadata.ComponentMetadataMap flush(IndexComponents.ForWrite perIn } long postingsLength = postingsPosition - postingsOffset; - try (var indexWriter = createIndexWriter(indexFile, termsOffset, perIndexComponents.context(), ordinalMapper, cv); + try (var indexWriter = createIndexWriter(indexFile, termsOffset, perIndexComponents.context(), ordinalMapper, compressor); var view = builder.getGraph().getView()) { indexWriter.getOutput().seek(indexFile.length()); // position at the end of the previous segment before writing our own header @@ -492,10 +496,8 @@ public SegmentMetadata.ComponentMetadataMap flush(IndexComponents.ForWrite perIn // write the graph var start = System.nanoTime(); - System.out.println("Writing graph"); - indexWriter.write(suppliers(view, (PQVectors) cv)); - System.out.println("Done writing graph"); -// SAICodecUtils.writeFooter(indexWriter.getOutput(), indexWriter.checksum()); + indexWriter.write(suppliers(view, compressor)); + SAICodecUtils.writeFooter(indexWriter.getOutput(), indexWriter.checksum()); logger.info("Writing graph took {}ms", (System.nanoTime() - start) / 1_000_000); long termsLength = indexWriter.getOutput().position() - termsOffset; @@ -509,7 +511,7 @@ public SegmentMetadata.ComponentMetadataMap flush(IndexComponents.ForWrite perIn } } - private OnDiskGraphIndexWriter createIndexWriter(File indexFile, long termsOffset, IndexContext context, OrdinalMapper ordinalMapper, CompressedVectors compressor) throws IOException + private OnDiskGraphIndexWriter createIndexWriter(File indexFile, long termsOffset, IndexContext context, OrdinalMapper ordinalMapper, VectorCompressor compressor) throws IOException { var indexWriterBuilder = new OnDiskGraphIndexWriter.Builder(builder.getGraph(), indexFile.toPath()) .withStartOffset(termsOffset) @@ -517,8 +519,8 @@ private OnDiskGraphIndexWriter createIndexWriter(File indexFile, long termsOffse .withMapper(ordinalMapper) .with(new InlineVectors(vectorValues.dimension())); - if (ENABLE_FUSED && compressor instanceof ImmutablePQVectors && Version.current().onDiskFormat().jvectorFileFormatVersion() >= 6) - indexWriterBuilder.with(new FusedPQ(context.getIndexWriterConfig().getAnnMaxDegree(), (ProductQuantization) compressor.getCompressor())); + if (ENABLE_FUSED && compressor instanceof ProductQuantization && Version.current().onDiskFormat().jvectorFileFormatVersion() >= 6) + indexWriterBuilder.with(new FusedPQ(context.getIndexWriterConfig().getAnnMaxDegree(), (ProductQuantization) compressor)); return indexWriterBuilder.build(); } @@ -533,12 +535,19 @@ static SegmentMetadata.ComponentMetadataMap createMetadataMap(long termsOffset, return metadataMap; } - private EnumMap> suppliers(ImmutableGraphIndex.View view, PQVectors pqVectors) + private EnumMap> suppliers(ImmutableGraphIndex.View view, VectorCompressor compressor) { var features = new EnumMap>(FeatureId.class); features.put(FeatureId.INLINE_VECTORS, nodeId -> new InlineVectors.State(vectorValues.getVector(nodeId))); - if (pqVectors != null && pqVectors.getCompressor() != null && ENABLE_FUSED && Version.current().onDiskFormat().jvectorFileFormatVersion() >= 6) - features.put(FeatureId.FUSED_PQ, nodeId -> new FusedPQ.State(view, pqVectors, nodeId)); + if (ENABLE_FUSED && Version.current().onDiskFormat().jvectorFileFormatVersion() >= 6) + { + if (compressor instanceof ProductQuantization) + { + ProductQuantization quantization = (ProductQuantization) compressor; + IntFunction> func = (oldNodeId) -> quantization.encode(vectorValues.getVector(oldNodeId)); + features.put(FeatureId.FUSED_PQ, nodeId -> new FusedPQ.State(view, func, nodeId)); + } + } return features; } @@ -594,7 +603,7 @@ public static PqInfo getPqIfPresent(IndexContext indexContext, Function writePQ(SequentialWriter writer, V5VectorPostingsWriter.RemappedPostings remapped, IndexContext indexContext, boolean attemptWritingFused) throws IOException { var preferredCompression = sourceModel.compressionProvider.apply(vectorValues.dimension()); @@ -618,7 +627,7 @@ private CompressedVectors writePQ(SequentialWriter writer, V5VectorPostingsWrite } assert !vectorValues.isValueShared(); // encode (compress) the vectors to save - if (compressor != null) + if ((compressor instanceof ProductQuantization && !attemptWritingFused) || compressor instanceof BinaryQuantization) cv = compressor.encodeAll(new RemappedVectorValues(remapped, remapped.maxNewOrdinal, vectorValues)); } @@ -627,9 +636,15 @@ private CompressedVectors writePQ(SequentialWriter writer, V5VectorPostingsWrite if (actualType == CompressionType.NONE) return null; + if (attemptWritingFused) + { + compressor.write(writer, Version.current().onDiskFormat().jvectorFileFormatVersion()); + return compressor; + } + // save (outside the synchronized block, this is io-bound not CPU) cv.write(writer, Version.current().onDiskFormat().jvectorFileFormatVersion()); - return cv; + return null; // Don't need compressor in this case } static void writePqHeader(DataOutput writer, boolean unitVectors, CompressionType type) diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java b/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java index 5abb95f7a51d..a17cfc650080 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java @@ -472,7 +472,7 @@ public SegmentMetadata.ComponentMetadataMap flush() throws IOException { writer.write(Map.of()); } -// SAICodecUtils.writeFooter(writer.getOutput(), writer.checksum()); + SAICodecUtils.writeFooter(writer.getOutput(), writer.checksum()); logger.info("Writing graph took {}ms", (System.nanoTime() - start) / 1_000_000); long termsLength = writer.getOutput().position() - termsOffset; From e21aade6898552c0a59ed1273b4332c168806184 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Wed, 15 Oct 2025 16:26:21 -0500 Subject: [PATCH 10/10] Fix writing header before closing CompactionGraph's first ODGIW --- .../cassandra/index/sai/disk/vector/CompactionGraph.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java b/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java index a17cfc650080..206f90aa7304 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java @@ -378,8 +378,7 @@ public long addGraphNode(InsertionResult result) public SegmentMetadata.ComponentMetadataMap flush() throws IOException { // header is required to write the postings, but we need to recreate the writer after that with an accurate OrdinalMapper - // TODO what to do about this? Removing this line makes the tests pass, but with it, we get an error when loading it -// writer.writeHeader(builder.getGraph().getView()); + writer.writeHeader(builder.getGraph().getView()); writer.close(); int nInProgress = builder.insertsInProgress(); @@ -415,7 +414,7 @@ public SegmentMetadata.ComponentMetadataMap flush() throws IOException var es = Executors.newSingleThreadExecutor(new NamedThreadFactory("CompactionGraphPostingsWriter")); long postingsLength; try (var indexHandle = perIndexComponents.get(IndexComponentType.TERMS_DATA).createIndexBuildTimeFileHandle(); - var index = OnDiskGraphIndex.load(indexHandle::createReader, termsOffset)) + var index = OnDiskGraphIndex.load(indexHandle::createReader, termsOffset, false)) { var postingsFuture = es.submit(() -> { // V2 doesn't support ONE_TO_MANY so force it to ZERO_OR_ONE_TO_MANY if necessary;