From 1da81f291bb1852ddb2f9ac62e506c5332a8fd82 Mon Sep 17 00:00:00 2001 From: tian bao <2011xuesong@gmail.com> Date: Wed, 27 Aug 2025 17:32:58 +0800 Subject: [PATCH 1/2] cached fileindex cache version 2 cache ut change ut --- .../connector/hive/read/HiveFileIndex.scala | 3 +- .../hive/read/HiveFileStatusCache.scala | 142 ++++++++++++++++++ .../hive/HiveFileStatusCacheSuite.scala | 96 ++++++++++++ 3 files changed, 240 insertions(+), 1 deletion(-) create mode 100644 extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileStatusCache.scala create mode 100644 extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala index 0142c556194..217a2c9d137 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala @@ -48,7 +48,8 @@ class HiveCatalogFileIndex( private val partPathToBindHivePart: mutable.Map[PartitionPath, CatalogTablePartition] = mutable.Map() - private val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) + private val fileStatusCache = + HiveFileStatusCache.getOrCreate(sparkSession, catalogTable.qualifiedName) private val baseLocation: Option[URI] = table.storage.locationUri diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileStatusCache.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileStatusCache.scala new file mode 100644 index 00000000000..0ef2743a455 --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileStatusCache.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.spark.connector.hive.read + +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean + +import scala.collection.JavaConverters._ + +import com.google.common.cache._ +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache} +import org.apache.spark.util.SizeEstimator + +/** + * Use [[HiveFileStatusCache.getOrCreate()]] to construct a globally shared file status cache. + */ +object HiveFileStatusCache { + private var sharedCache: HiveSharedInMemoryCache = _ + + /** + * @return a new FileStatusCache based on session configuration. Cache memory quota is + * shared across all clients. + */ + def getOrCreate(session: SparkSession, qualifiedName: String): FileStatusCache = + synchronized { + if (session.sessionState.conf.manageFilesourcePartitions && + session.sessionState.conf.filesourcePartitionFileCacheSize > 0) { + if (sharedCache == null) { + sharedCache = new HiveSharedInMemoryCache( + session.sessionState.conf.filesourcePartitionFileCacheSize, + session.sessionState.conf.metadataCacheTTL) + } + sharedCache.createForNewClient(qualifiedName) + } else { + NoopCache + } + } + + def resetForTesting(): Unit = synchronized { + sharedCache = null + } +} + +/** + * An implementation that caches partition file statuses in memory. + * + * @param maxSizeInBytes max allowable cache size before entries start getting evicted + */ +private class HiveSharedInMemoryCache(maxSizeInBytes: Long, cacheTTL: Long) extends Logging { + + // Opaque object that uniquely identifies a shared cache user + private type ClientId = Object + + private val warnedAboutEviction = new AtomicBoolean(false) + + // we use a composite cache key in order to distinguish entries inserted by different clients + private val cache: Cache[(ClientId, Path), Array[FileStatus]] = { + // [[Weigher]].weigh returns Int so we could only cache objects < 2GB + // instead, the weight is divided by this factor (which is smaller + // than the size of one [[FileStatus]]). + // so it will support objects up to 64GB in size. + val weightScale = 32 + val weigher = new Weigher[(ClientId, Path), Array[FileStatus]] { + override def weigh(key: (ClientId, Path), value: Array[FileStatus]): Int = { + val estimate = (SizeEstimator.estimate(key) + SizeEstimator.estimate(value)) / weightScale + if (estimate > Int.MaxValue) { + logWarning(s"Cached table partition metadata size is too big. Approximating to " + + s"${Int.MaxValue.toLong * weightScale}.") + Int.MaxValue + } else { + estimate.toInt + } + } + } + val removalListener = new RemovalListener[(ClientId, Path), Array[FileStatus]]() { + override def onRemoval( + removed: RemovalNotification[(ClientId, Path), Array[FileStatus]]): Unit = { + if (removed.getCause == RemovalCause.SIZE && + warnedAboutEviction.compareAndSet(false, true)) { + logWarning( + "Evicting cached table partition metadata from memory due to size constraints " + + "(spark.sql.hive.filesourcePartitionFileCacheSize = " + + maxSizeInBytes + " bytes). This may impact query planning performance.") + } + } + } + + var builder = CacheBuilder.newBuilder() + .weigher(weigher) + .removalListener(removalListener) + .maximumWeight(maxSizeInBytes / weightScale) + + if (cacheTTL > 0) { + builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS) + } + + builder.build[(ClientId, Path), Array[FileStatus]]() + } + + /** + * @return a FileStatusCache that does not share any entries with any other client, but does + * share memory resources for the purpose of cache eviction. + */ + def createForNewClient(clientId: Object): HiveFileStatusCache = new HiveFileStatusCache { + + override def getLeafFiles(path: Path): Option[Array[FileStatus]] = { + Option(cache.getIfPresent((clientId, path))) + } + + override def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit = { + cache.put((clientId, path), leafFiles) + } + + override def invalidateAll(): Unit = { + cache.asMap.asScala.foreach { case (key, value) => + if (key._1 == clientId) { + cache.invalidate(key) + } + } + } + } + + abstract class HiveFileStatusCache extends FileStatusCache {} +} diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala new file mode 100644 index 00000000000..3bb66800625 --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.spark.connector.hive + +import scala.concurrent.duration.DurationInt + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} +import org.scalatest.concurrent.Eventually.eventually +import org.scalatest.concurrent.Futures.timeout + +import org.apache.kyuubi.spark.connector.hive.read.HiveFileStatusCache + +class HiveFileStatusCacheSuite extends KyuubiHiveTest { + + override def beforeEach(): Unit = { + super.beforeEach() + HiveFileStatusCache.resetForTesting() + } + + override def afterEach(): Unit = { + super.afterEach() + HiveFileStatusCache.resetForTesting() + } + + test("cached by qualifiedName") { + val previousValue = SQLConf.get.getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS) + try { + // using 'SQLConf.get.setConf' instead of 'withSQLConf' to set a static config at runtime + SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, 1L) + + val path = new Path("/dummy_tmp", "abc") + val files = (1 to 3).map(_ => new FileStatus()) + + HiveFileStatusCache.resetForTesting() + val fileStatusCacheTabel1 = HiveFileStatusCache.getOrCreate(spark, "catalog.db.table1") + fileStatusCacheTabel1.putLeafFiles(path, files.toArray) + val fileStatusCacheTabel2 = HiveFileStatusCache.getOrCreate(spark, "catalog.db.table1") + val fileStatusCacheTabel3 = HiveFileStatusCache.getOrCreate(spark, "catalog.db.table2") + + // Exactly 3 files are cached. + assert(fileStatusCacheTabel1.getLeafFiles(path).get.length === 3) + assert(fileStatusCacheTabel2.getLeafFiles(path).get.length === 3) + assert(fileStatusCacheTabel3.getLeafFiles(path).isEmpty === true) + // Wait until the cache expiration. + eventually(timeout(3.seconds)) { + // And the cache is gone. + assert(fileStatusCacheTabel1.getLeafFiles(path).isEmpty === true) + assert(fileStatusCacheTabel2.getLeafFiles(path).isEmpty === true) + assert(fileStatusCacheTabel3.getLeafFiles(path).isEmpty === true) + } + } finally { + SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, previousValue) + } + } + + test("expire FileStatusCache if TTL is configured") { + val previousValue = SQLConf.get.getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS) + try { + // using 'SQLConf.get.setConf' instead of 'withSQLConf' to set a static config at runtime + SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, 1L) + + val path = new Path("/dummy_tmp", "abc") + val files = (1 to 3).map(_ => new FileStatus()) + + HiveFileStatusCache.resetForTesting() + val fileStatusCache = HiveFileStatusCache.getOrCreate(spark, "catalog.db.table") + fileStatusCache.putLeafFiles(path, files.toArray) + + // Exactly 3 files are cached. + assert(fileStatusCache.getLeafFiles(path).get.length === 3) + // Wait until the cache expiration. + eventually(timeout(3.seconds)) { + // And the cache is gone. + assert(fileStatusCache.getLeafFiles(path).isEmpty === true) + } + } finally { + SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, previousValue) + } + } +} From 9353f0c79cce190f9883c463c60261cf54f7584f Mon Sep 17 00:00:00 2001 From: tian bao <2011xuesong@gmail.com> Date: Tue, 9 Sep 2025 16:01:12 +0800 Subject: [PATCH 2/2] fix failed tests --- .../kyuubi/spark/connector/hive/HiveTableCatalog.scala | 6 +++++- .../connector/hive/HiveFileStatusCacheSuite.scala | 10 ---------- .../kyuubi/spark/connector/hive/KyuubiHiveTest.scala | 3 +++ 3 files changed, 8 insertions(+), 11 deletions(-) diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala index f72881f928f..bbd9df41d05 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala @@ -395,11 +395,15 @@ class HiveTableCatalog(sparkSession: SparkSession) override def dropTable(ident: Identifier): Boolean = withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") { try { - if (loadTable(ident) != null) { + val table = loadTable(ident) + if (table != null) { catalog.dropTable( ident.asTableIdentifier, ignoreIfNotExists = true, purge = true /* skip HDFS trash */ ) + if (table.isInstanceOf[HiveTable]) { + table.asInstanceOf[HiveTable].fileIndex.refresh() + } true } else { false diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala index 3bb66800625..3706bed4556 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala @@ -28,16 +28,6 @@ import org.apache.kyuubi.spark.connector.hive.read.HiveFileStatusCache class HiveFileStatusCacheSuite extends KyuubiHiveTest { - override def beforeEach(): Unit = { - super.beforeEach() - HiveFileStatusCache.resetForTesting() - } - - override def afterEach(): Unit = { - super.afterEach() - HiveFileStatusCache.resetForTesting() - } - test("cached by qualifiedName") { val previousValue = SQLConf.get.getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS) try { diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala index fb5bcd62184..68d01f1cc78 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.connector.catalog.{SupportsNamespaces, TableCatalog} import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.Utils import org.apache.kyuubi.spark.connector.common.LocalSparkSession +import org.apache.kyuubi.spark.connector.hive.read.HiveFileStatusCache abstract class KyuubiHiveTest extends QueryTest with Logging { @@ -49,11 +50,13 @@ abstract class KyuubiHiveTest extends QueryTest with Logging { override def beforeEach(): Unit = { super.beforeAll() + HiveFileStatusCache.resetForTesting() getOrCreateSpark() } override def afterEach(): Unit = { super.afterAll() + HiveFileStatusCache.resetForTesting() LocalSparkSession.stop(innerSpark) }