Skip to content

Commit d2f6ce9

Browse files
cataliniianuvedverma
authored andcommitted
[SPARK-28098][SQL]Support read partitioned Hive tables with (#40)
(cherry picked from commit 984bf78)
1 parent 7f24cf7 commit d2f6ce9

File tree

4 files changed

+44
-3
lines changed

4 files changed

+44
-3
lines changed

core/src/test/scala/org/apache/spark/util/LyftUtilsSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.util
1919

20-
import org.apache.spark.{SparkFunSuite}
20+
import org.apache.spark.SparkFunSuite
2121
import org.apache.spark.internal.Logging
2222

2323
object TestObjectLyftUtils {

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4311,6 +4311,13 @@ object SQLConf {
43114311
.booleanConf
43124312
.createWithDefault(false)
43134313

4314+
val READ_PARTITION_WITH_SUBDIRECTORY_ENABLED =
4315+
buildConf("spark.sql.sources.readPartitionWithSubdirectory.enabled")
4316+
.doc("When set to true, Spark SQL could read the files of " +
4317+
" partitioned hive table from subdirectories under root path of table")
4318+
.booleanConf
4319+
.createWithDefault(true)
4320+
43144321
val LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA =
43154322
buildConf("spark.sql.legacy.avro.allowIncompatibleSchema")
43164323
.internal()
@@ -5254,6 +5261,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
52545261

52555262
def maxConcurrentOutputFileWriters: Int = getConf(SQLConf.MAX_CONCURRENT_OUTPUT_FILE_WRITERS)
52565263

5264+
def readPartitionWithSubdirectoryEnabled: Boolean =
5265+
getConf(READ_PARTITION_WITH_SUBDIRECTORY_ENABLED)
5266+
52575267
def plannedWriteEnabled: Boolean = getConf(SQLConf.PLANNED_WRITE_ENABLED)
52585268

52595269
def inferDictAsStruct: Boolean = getConf(SQLConf.INFER_NESTED_DICT_AS_STRUCT)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ class InMemoryFileIndex(
6161
override val rootPaths =
6262
rootPathsSpecified.filterNot(FileStreamSink.ancestorIsMetadataDirectory(_, hadoopConf))
6363

64+
val readPartitionWithSubdirectoryEnabled =
65+
sparkSession.sessionState.conf.readPartitionWithSubdirectoryEnabled
66+
6467
@volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
6568
@volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
6669
@volatile private var cachedPartitionSpec: PartitionSpec = _
@@ -96,10 +99,25 @@ class InMemoryFileIndex(
9699
val files = listLeafFiles(rootPaths)
97100
cachedLeafFiles =
98101
new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
99-
cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
102+
cachedLeafDirToChildrenFiles =
103+
if (readPartitionWithSubdirectoryEnabled) {
104+
files.toArray.groupBy(file => getRootPathsLeafDir(file.getPath.getParent, file.getPath))
105+
} else {
106+
files.toArray.groupBy(_.getPath.getParent)
107+
}
100108
cachedPartitionSpec = null
101109
}
102110

111+
private def getRootPathsLeafDir(path: Path, child: Path): Path = {
112+
if (rootPaths.contains(child)) {
113+
path
114+
} else if (rootPaths.contains(path)) {
115+
path
116+
} else {
117+
getRootPathsLeafDir(path.getParent, path)
118+
}
119+
}
120+
103121
override def equals(other: Any): Boolean = other match {
104122
case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
105123
case _ => false

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.Striped
2525
import org.apache.hadoop.fs.Path
2626

2727
import org.apache.spark.SparkException
28+
import org.apache.spark.deploy.SparkHadoopUtil
2829
import org.apache.spark.internal.Logging
2930
import org.apache.spark.sql.{AnalysisException, SparkSession}
3031
import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
@@ -283,7 +284,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
283284
LogicalRelation(
284285
DataSource(
285286
sparkSession = sparkSession,
286-
paths = rootPath.toString :: Nil,
287+
paths = getDirectoryPathSeq(rootPath),
287288
userSpecifiedSchema = Option(updatedTable.dataSchema),
288289
bucketSpec = hiveBucketSpec,
289290
// Do not interpret the 'path' option at all when tables are read using the Hive
@@ -321,6 +322,18 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
321322
result.copy(output = newOutput)
322323
}
323324

325+
private def getDirectoryPathSeq(rootPath: Path): Seq[String] = {
326+
val enableSupportSubDirectories =
327+
sparkSession.sessionState.conf.readPartitionWithSubdirectoryEnabled
328+
329+
if (enableSupportSubDirectories) {
330+
val fs = rootPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
331+
SparkHadoopUtil.get.listLeafDirStatuses(fs, rootPath).map(_.getPath.toString)
332+
} else {
333+
rootPath.toString :: Nil
334+
}
335+
}
336+
324337
private def inferIfNeeded(
325338
relation: HiveTableRelation,
326339
options: Map[String, String],

0 commit comments

Comments
 (0)