Skip to content

Commit 477e6e8

Browse files
micheal-osarutak
authored andcommitted
Revert "[SPARK-54106][SS] State store row checksum implementation"
This reverts commit ac717dd. ### What changes were proposed in this pull request? Revert: ac717dd ### Why are the changes needed? Cause tests to be slow in master ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? new Tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #52827 from micheal-o/revert_row_check. Authored-by: micheal-o <[email protected]> Signed-off-by: Kousuke Saruta <[email protected]>
1 parent 0732e44 commit 477e6e8

File tree

20 files changed

+106
-1625
lines changed

20 files changed

+106
-1625
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5365,13 +5365,6 @@
53655365
],
53665366
"sqlState" : "42K06"
53675367
},
5368-
"STATE_STORE_ROW_CHECKSUM_VERIFICATION_FAILED" : {
5369-
"message" : [
5370-
"Row checksum verification failed for stateStore=<stateStoreId>. The row may be corrupted.",
5371-
"Expected checksum: <expectedChecksum>, Computed checksum: <computedChecksum>."
5372-
],
5373-
"sqlState" : "XXKST"
5374-
},
53755368
"STATE_STORE_STATE_SCHEMA_FILES_THRESHOLD_EXCEEDED" : {
53765369
"message" : [
53775370
"The number of state schema files <numStateSchemaFiles> exceeds the maximum number of state schema files for this query: <maxStateSchemaFiles>.",

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2633,31 +2633,6 @@ object SQLConf {
26332633
.checkValue(k => k >= 0, "Must be greater than or equal to 0")
26342634
.createWithDefault(5)
26352635

2636-
val STATE_STORE_ROW_CHECKSUM_ENABLED =
2637-
buildConf("spark.sql.streaming.stateStore.rowChecksum.enabled")
2638-
.internal()
2639-
.doc("When true, checksum would be generated and verified for each state store row. " +
2640-
"This is used to detect row level corruption. " +
2641-
"Note: This configuration cannot be changed between query restarts " +
2642-
"from the same checkpoint location.")
2643-
.version("4.1.0")
2644-
.booleanConf
2645-
.createWithDefault(false)
2646-
2647-
val STATE_STORE_ROW_CHECKSUM_READ_VERIFICATION_RATIO =
2648-
buildConf("spark.sql.streaming.stateStore.rowChecksum.readVerificationRatio")
2649-
.internal()
2650-
.doc("When specified, Spark will do row checksum verification for every specified " +
2651-
"number of rows read from state store. The check is to ensure the row read from " +
2652-
"state store is not corrupt. Default is 0, which means no verification during read " +
2653-
"but we will still do verification when loading from checkpoint location." +
2654-
"Example, if you set to 1, it will do the check for every row read from the state store." +
2655-
"If set to 10, it will do the check for every 10th row read from the state store.")
2656-
.version("4.1.0")
2657-
.longConf
2658-
.checkValue(k => k >= 0, "Must be greater than or equal to 0")
2659-
.createWithDefault(if (Utils.isTesting) 1 else 0)
2660-
26612636
val STATEFUL_SHUFFLE_PARTITIONS_INTERNAL =
26622637
buildConf("spark.sql.streaming.internal.stateStore.partitions")
26632638
.doc("WARN: This config is used internally and is not intended to be user-facing. This " +
@@ -6760,11 +6735,6 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
67606735
def stateStoreCoordinatorMaxLaggingStoresToReport: Int =
67616736
getConf(STATE_STORE_COORDINATOR_MAX_LAGGING_STORES_TO_REPORT)
67626737

6763-
def stateStoreRowChecksumEnabled: Boolean = getConf(STATE_STORE_ROW_CHECKSUM_ENABLED)
6764-
6765-
def stateStoreRowChecksumReadVerificationRatio: Long =
6766-
getConf(STATE_STORE_ROW_CHECKSUM_READ_VERIFICATION_RATIO)
6767-
67686738
def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION)
67696739

67706740
def checkpointFileChecksumEnabled: Boolean = getConf(STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,7 @@ object OffsetSeqMetadata extends Logging {
113113
FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION, STREAMING_AGGREGATION_STATE_FORMAT_VERSION,
114114
STREAMING_JOIN_STATE_FORMAT_VERSION, STATE_STORE_COMPRESSION_CODEC,
115115
STATE_STORE_ROCKSDB_FORMAT_VERSION, STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION,
116-
PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN, STREAMING_STATE_STORE_ENCODING_FORMAT,
117-
STATE_STORE_ROW_CHECKSUM_ENABLED
116+
PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN, STREAMING_STATE_STORE_ENCODING_FORMAT
118117
)
119118

120119
/**
@@ -160,8 +159,7 @@ object OffsetSeqMetadata extends Logging {
160159
STATE_STORE_COMPRESSION_CODEC.key -> CompressionCodec.LZ4,
161160
STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION.key -> "false",
162161
PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key -> "true",
163-
STREAMING_STATE_STORE_ENCODING_FORMAT.key -> "unsaferow",
164-
STATE_STORE_ROW_CHECKSUM_ENABLED.key -> "false"
162+
STREAMING_STATE_STORE_ENCODING_FORMAT.key -> "unsaferow"
165163
)
166164

167165
def readValue[T](metadataLog: OffsetSeqMetadata, confKey: ConfigEntry[T]): String = {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreMap.scala

Lines changed: 21 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.sql.execution.streaming.state
1919

20-
import java.util.Map.Entry
21-
2220
import scala.collection.mutable
2321
import scala.jdk.CollectionConverters._
2422

@@ -29,13 +27,10 @@ import org.apache.spark.sql.types.{StructField, StructType}
2927
trait HDFSBackedStateStoreMap {
3028
def size(): Int
3129
def get(key: UnsafeRow): UnsafeRow
32-
def put(key: UnsafeRow, value: UnsafeRowWrapper): UnsafeRowWrapper
30+
def put(key: UnsafeRow, value: UnsafeRow): UnsafeRow
3331
def putAll(map: HDFSBackedStateStoreMap): Unit
34-
def remove(key: UnsafeRow): UnsafeRowWrapper
32+
def remove(key: UnsafeRow): UnsafeRow
3533
def iterator(): Iterator[UnsafeRowPair]
36-
/** Returns entries in the underlying map and skips additional checks done by [[iterator]].
37-
* [[iterator]] should be preferred over this. */
38-
def entryIterator(): Iterator[Entry[UnsafeRow, UnsafeRowWrapper]]
3934
def prefixScan(prefixKey: UnsafeRow): Iterator[UnsafeRowPair]
4035
}
4136

@@ -45,78 +40,50 @@ object HDFSBackedStateStoreMap {
4540
// the map when the iterator was created
4641
// - Any updates to the map while iterating through the filtered iterator does not throw
4742
// java.util.ConcurrentModificationException
48-
type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, UnsafeRowWrapper]
43+
type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, UnsafeRow]
4944

50-
def create(
51-
keySchema: StructType,
52-
numColsPrefixKey: Int,
53-
readVerifier: Option[KeyValueIntegrityVerifier]): HDFSBackedStateStoreMap = {
45+
def create(keySchema: StructType, numColsPrefixKey: Int): HDFSBackedStateStoreMap = {
5446
if (numColsPrefixKey > 0) {
55-
new PrefixScannableHDFSBackedStateStoreMap(keySchema, numColsPrefixKey, readVerifier)
47+
new PrefixScannableHDFSBackedStateStoreMap(keySchema, numColsPrefixKey)
5648
} else {
57-
new NoPrefixHDFSBackedStateStoreMap(readVerifier)
58-
}
59-
}
60-
61-
/** Get the value row from the value wrapper and verify it */
62-
def getAndVerifyValueRow(
63-
key: UnsafeRow,
64-
valueWrapper: UnsafeRowWrapper,
65-
readVerifier: Option[KeyValueIntegrityVerifier]): UnsafeRow = {
66-
Option(valueWrapper) match {
67-
case Some(value) =>
68-
readVerifier.foreach(_.verify(key, value))
69-
value.unsafeRow()
70-
case None => null
49+
new NoPrefixHDFSBackedStateStoreMap()
7150
}
7251
}
7352
}
7453

75-
class NoPrefixHDFSBackedStateStoreMap(private val readVerifier: Option[KeyValueIntegrityVerifier])
76-
extends HDFSBackedStateStoreMap {
54+
class NoPrefixHDFSBackedStateStoreMap extends HDFSBackedStateStoreMap {
7755
private val map = new HDFSBackedStateStoreMap.MapType()
7856

7957
override def size(): Int = map.size()
8058

81-
override def get(key: UnsafeRow): UnsafeRow = {
82-
HDFSBackedStateStoreMap.getAndVerifyValueRow(key, map.get(key), readVerifier)
83-
}
59+
override def get(key: UnsafeRow): UnsafeRow = map.get(key)
8460

85-
override def put(key: UnsafeRow, value: UnsafeRowWrapper): UnsafeRowWrapper = map.put(key, value)
61+
override def put(key: UnsafeRow, value: UnsafeRow): UnsafeRow = map.put(key, value)
8662

8763
def putAll(other: HDFSBackedStateStoreMap): Unit = {
8864
other match {
8965
case o: NoPrefixHDFSBackedStateStoreMap => map.putAll(o.map)
90-
case _ => other.entryIterator().foreach { pair => put(pair.getKey, pair.getValue) }
66+
case _ => other.iterator().foreach { pair => put(pair.key, pair.value) }
9167
}
9268
}
9369

94-
override def remove(key: UnsafeRow): UnsafeRowWrapper = map.remove(key)
70+
override def remove(key: UnsafeRow): UnsafeRow = map.remove(key)
9571

9672
override def iterator(): Iterator[UnsafeRowPair] = {
9773
val unsafeRowPair = new UnsafeRowPair()
98-
entryIterator().map { entry =>
99-
val valueRow = HDFSBackedStateStoreMap
100-
.getAndVerifyValueRow(entry.getKey, entry.getValue, readVerifier)
101-
unsafeRowPair.withRows(entry.getKey, valueRow)
74+
map.entrySet.asScala.iterator.map { entry =>
75+
unsafeRowPair.withRows(entry.getKey, entry.getValue)
10276
}
10377
}
10478

105-
/** Returns entries in the underlying map and skips additional checks done by [[iterator]].
106-
* [[iterator]] should be preferred over this. */
107-
override def entryIterator(): Iterator[Entry[UnsafeRow, UnsafeRowWrapper]] = {
108-
map.entrySet.asScala.iterator
109-
}
110-
11179
override def prefixScan(prefixKey: UnsafeRow): Iterator[UnsafeRowPair] = {
11280
throw SparkUnsupportedOperationException()
11381
}
11482
}
11583

11684
class PrefixScannableHDFSBackedStateStoreMap(
11785
keySchema: StructType,
118-
numColsPrefixKey: Int,
119-
private val readVerifier: Option[KeyValueIntegrityVerifier]) extends HDFSBackedStateStoreMap {
86+
numColsPrefixKey: Int) extends HDFSBackedStateStoreMap {
12087

12188
private val map = new HDFSBackedStateStoreMap.MapType()
12289

@@ -136,11 +103,9 @@ class PrefixScannableHDFSBackedStateStoreMap(
136103

137104
override def size(): Int = map.size()
138105

139-
override def get(key: UnsafeRow): UnsafeRow = {
140-
HDFSBackedStateStoreMap.getAndVerifyValueRow(key, map.get(key), readVerifier)
141-
}
106+
override def get(key: UnsafeRow): UnsafeRow = map.get(key)
142107

143-
override def put(key: UnsafeRow, value: UnsafeRowWrapper): UnsafeRowWrapper = {
108+
override def put(key: UnsafeRow, value: UnsafeRow): UnsafeRow = {
144109
val ret = map.put(key, value)
145110

146111
val prefixKey = prefixKeyProjection(key).copy()
@@ -171,11 +136,11 @@ class PrefixScannableHDFSBackedStateStoreMap(
171136
prefixKeyToKeysMap.put(prefixKey, newSet)
172137
}
173138

174-
case _ => other.entryIterator().foreach { pair => put(pair.getKey, pair.getValue) }
139+
case _ => other.iterator().foreach { pair => put(pair.key, pair.value) }
175140
}
176141
}
177142

178-
override def remove(key: UnsafeRow): UnsafeRowWrapper = {
143+
override def remove(key: UnsafeRow): UnsafeRow = {
179144
val ret = map.remove(key)
180145

181146
if (ret != null) {
@@ -191,27 +156,15 @@ class PrefixScannableHDFSBackedStateStoreMap(
191156

192157
override def iterator(): Iterator[UnsafeRowPair] = {
193158
val unsafeRowPair = new UnsafeRowPair()
194-
entryIterator().map { entry =>
195-
val valueRow = HDFSBackedStateStoreMap
196-
.getAndVerifyValueRow(entry.getKey, entry.getValue, readVerifier)
197-
unsafeRowPair.withRows(entry.getKey, valueRow)
159+
map.entrySet.asScala.iterator.map { entry =>
160+
unsafeRowPair.withRows(entry.getKey, entry.getValue)
198161
}
199162
}
200163

201-
/** Returns entries in the underlying map and skips additional checks done by [[iterator]].
202-
* [[iterator]] should be preferred over this. */
203-
override def entryIterator(): Iterator[Entry[UnsafeRow, UnsafeRowWrapper]] = {
204-
map.entrySet.asScala.iterator
205-
}
206-
207164
override def prefixScan(prefixKey: UnsafeRow): Iterator[UnsafeRowPair] = {
208165
val unsafeRowPair = new UnsafeRowPair()
209166
prefixKeyToKeysMap.getOrDefault(prefixKey, mutable.Set.empty[UnsafeRow])
210167
.iterator
211-
.map { keyRow =>
212-
val valueRow = HDFSBackedStateStoreMap
213-
.getAndVerifyValueRow(keyRow, map.get(keyRow), readVerifier)
214-
unsafeRowPair.withRows(keyRow, valueRow)
215-
}
168+
.map { key => unsafeRowPair.withRows(key, map.get(key)) }
216169
}
217170
}

0 commit comments

Comments
 (0)