Skip to content

Commit 47063d9

Browse files
flaming-archerpan3793
authored andcommitted
[KYUUBI #7129] Support PARQUET hive table pushdown filter
### Why are the changes needed? Previously, the `HiveScan` class was used to read data. If it is determined to be PARQUET type, the `ParquetScan` from Spark datasourcev2 can be used. `ParquetScan` supports pushfilter down, but `HiveScan` does not yet support it. The conversation can be controlled by setting `spark.sql.kyuubi.hive.connector.read.convertMetastoreParquet`. When enabled, the data source PARQUET reader is used to process PARQUET tables created by using the HiveQL syntax, instead of Hive SerDe. close #7129 ### How was this patch tested? added unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #7130 from flaming-archer/master_parquet_filterdown. Closes #7129 d7059dc [tian bao] Support PARQUET hive table pushdown filter Authored-by: tian bao <[email protected]> Signed-off-by: Cheng Pan <[email protected]>
1 parent a1a08e7 commit 47063d9

File tree

4 files changed

+132
-5
lines changed

4 files changed

+132
-5
lines changed

extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,12 @@ import org.apache.spark.sql.connector.expressions.Transform
3333
import org.apache.spark.sql.connector.read.ScanBuilder
3434
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
3535
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScanBuilder
36+
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScanBuilder
3637
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{BucketSpecHelper, LogicalExpressions}
3738
import org.apache.spark.sql.types.StructType
3839
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3940

40-
import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.READ_CONVERT_METASTORE_ORC
41+
import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.{READ_CONVERT_METASTORE_ORC, READ_CONVERT_METASTORE_PARQUET}
4142
import org.apache.kyuubi.spark.connector.hive.read.{HiveCatalogFileIndex, HiveScanBuilder}
4243
import org.apache.kyuubi.spark.connector.hive.write.HiveWriteBuilder
4344

@@ -97,6 +98,9 @@ case class HiveTable(
9798
convertedProvider match {
9899
case Some("ORC") if sparkSession.sessionState.conf.getConf(READ_CONVERT_METASTORE_ORC) =>
99100
OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
101+
case Some("PARQUET")
102+
if sparkSession.sessionState.conf.getConf(READ_CONVERT_METASTORE_PARQUET) =>
103+
ParquetScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
100104
case _ => HiveScanBuilder(sparkSession, fileIndex, dataSchema, catalogTable)
101105
}
102106
}

extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,12 @@ object KyuubiHiveConnectorConf {
4949
.version("1.11.0")
5050
.booleanConf
5151
.createWithDefault(true)
52+
53+
val READ_CONVERT_METASTORE_PARQUET =
54+
buildConf("spark.sql.kyuubi.hive.connector.read.convertMetastoreParquet")
55+
.doc("When enabled, the data source PARQUET reader is used to process " +
56+
"PARQUET tables created by using the HiveQL syntax, instead of Hive SerDe.")
57+
.version("1.11.0")
58+
.booleanConf
59+
.createWithDefault(true)
5260
}

extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,13 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
3232
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
3333
import org.apache.spark.sql.connector.expressions.Transform
3434
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
35+
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
3536
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper._
3637
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
3738
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3839

3940
import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.IdentifierHelper
40-
import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.READ_CONVERT_METASTORE_ORC
41+
import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.{READ_CONVERT_METASTORE_ORC, READ_CONVERT_METASTORE_PARQUET}
4142
import org.apache.kyuubi.spark.connector.hive.read.HiveScan
4243

4344
class HiveCatalogSuite extends KyuubiHiveTest {
@@ -349,9 +350,30 @@ class HiveCatalogSuite extends KyuubiHiveTest {
349350
val parProps: util.Map[String, String] = new util.HashMap[String, String]()
350351
parProps.put(TableCatalog.PROP_PROVIDER, "parquet")
351352
val pt = catalog.createTable(parquet_table, schema, Array.empty[Transform], parProps)
352-
val parScan = pt.asInstanceOf[HiveTable]
353-
.newScanBuilder(CaseInsensitiveStringMap.empty()).build().asInstanceOf[HiveScan]
354-
assert(parScan.isSplitable(new Path("empty")))
353+
354+
Seq("true", "false").foreach { value =>
355+
withSparkSession(Map(READ_CONVERT_METASTORE_PARQUET.key -> value)) { _ =>
356+
val scan = pt.asInstanceOf[HiveTable]
357+
.newScanBuilder(CaseInsensitiveStringMap.empty()).build()
358+
359+
val parScan = value match {
360+
case "true" =>
361+
assert(
362+
scan.isInstanceOf[ParquetScan],
363+
s"Expected ParquetScan, got ${scan.getClass.getSimpleName}")
364+
scan.asInstanceOf[ParquetScan]
365+
case "false" =>
366+
assert(
367+
scan.isInstanceOf[HiveScan],
368+
s"Expected HiveScan, got ${scan.getClass.getSimpleName}")
369+
scan.asInstanceOf[HiveScan]
370+
case _ =>
371+
throw new IllegalArgumentException(
372+
s"Unexpected value: '$value'. Only 'true' or 'false' are allowed.")
373+
}
374+
assert(parScan.isSplitable(new Path("empty")))
375+
}
376+
}
355377

356378
val orc_table = Identifier.of(testNs, "orc_table")
357379
val orcProps: util.Map[String, String] = new util.HashMap[String, String]()

extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,99 @@ class HiveQuerySuite extends KyuubiHiveTest {
353353
}
354354
}
355355

356+
test("PARQUET filter pushdown") {
357+
val table = "hive.default.parquet_filter_pushdown"
358+
withTable(table) {
359+
spark.sql(
360+
s"""
361+
| CREATE TABLE $table (
362+
| id INT,
363+
| data STRING,
364+
| value INT
365+
| ) PARTITIONED BY (dt STRING, region STRING)
366+
| STORED AS PARQUET
367+
| """.stripMargin).collect()
368+
369+
// Insert test data with partitions
370+
spark.sql(
371+
s"""
372+
| INSERT INTO $table PARTITION (dt='2024-01-01', region='east')
373+
| VALUES (1, 'a', 100), (2, 'b', 200), (11, 'aa', 100), (22, 'b', 200)
374+
|""".stripMargin)
375+
376+
spark.sql(
377+
s"""
378+
| INSERT INTO $table PARTITION (dt='2024-01-01', region='west')
379+
| VALUES (3, 'c', 300), (4, 'd', 400), (33, 'cc', 300), (44, 'dd', 400)
380+
|""".stripMargin)
381+
spark.sql(
382+
s"""
383+
| INSERT INTO $table PARTITION (dt='2024-01-02', region='east')
384+
| VALUES (5, 'e', 500), (6, 'f', 600), (55, 'ee', 500), (66, 'ff', 600)
385+
| """.stripMargin)
386+
387+
// Test multiple partition filters
388+
val df1 = spark.sql(
389+
s"""
390+
| SELECT * FROM $table
391+
| WHERE dt = '2024-01-01' AND region = 'east' AND value > 1500
392+
|""".stripMargin)
393+
assert(df1.count() === 0)
394+
395+
// Test multiple partition filters
396+
val df2 = spark.sql(
397+
s"""
398+
| SELECT * FROM $table
399+
| WHERE dt = '2024-01-01' AND region = 'east' AND value > 150
400+
|""".stripMargin)
401+
assert(df2.count() === 2)
402+
assert(df2.collect().map(_.getInt(0)).toSet === Set(2, 22))
403+
404+
// Test explain
405+
val df3 = spark.sql(
406+
s"""
407+
| EXPLAIN SELECT count(*) as total_rows
408+
| FROM $table
409+
| WHERE dt = '2024-01-01' AND region = 'east' AND value > 1
410+
|""".stripMargin)
411+
assert(df3.count() === 1)
412+
// contains like : PushedFilters: [IsNotNull(value), GreaterThan(value,1)]
413+
assert(df3.collect().map(_.getString(0)).filter { s =>
414+
s.contains("PushedFilters") && !s.contains("PushedFilters: []")
415+
}.toSet.size == 1)
416+
417+
// Test aggregation pushdown partition filters
418+
spark.conf.set("spark.sql.parquet.aggregatePushdown", true)
419+
420+
// Test aggregation pushdown partition filters
421+
val df4 = spark.sql(
422+
s"""
423+
| SELECT count(*) as total_rows
424+
| FROM $table
425+
| WHERE dt = '2024-01-01' AND region = 'east'
426+
| group by dt, region
427+
| """.stripMargin)
428+
assert(df4.count() === 1)
429+
assert(df4.collect().map(_.getLong(0)).toSet === Set(4L))
430+
431+
val df5 = spark.sql(
432+
s"""
433+
| EXPLAIN SELECT count(*) as total_rows
434+
| FROM $table
435+
| WHERE dt = '2024-01-01' AND region = 'east'
436+
| group by dt, region
437+
| """.stripMargin)
438+
assert(df5.count() === 1)
439+
// contains like : PushedAggregation: [COUNT(*)],
440+
assert(df5.collect().map(_.getString(0)).filter { s =>
441+
s.contains("PushedAggregation") && !s.contains("PushedAggregation: []")
442+
}.toSet.size == 1)
443+
444+
spark.conf.set("spark.sql.parquet.aggregatePushdown", false)
445+
446+
}
447+
}
448+
356449
private def readPartitionedTable(format: String, hiveTable: Boolean): Unit = {
357450
withSparkSession() { spark =>
358451
val table = "hive.default.employee"

0 commit comments

Comments
 (0)