Skip to content

Commit 8e7f8ac

Browse files
spark.sql.files.minPartitionNum, maxSplitBytes hint and File-Based Data Scanning
1 parent 3c2f1dd commit 8e7f8ac

File tree

10 files changed

+80
-43
lines changed

10 files changed

+80
-43
lines changed

docs/SQLConf.md

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -462,10 +462,6 @@ Used when:
462462

463463
[spark.sql.files.minPartitionNum](configuration-properties.md#spark.sql.files.minPartitionNum)
464464

465-
Used when:
466-
467-
* `FilePartition` utility is requested for [maxSplitBytes](datasources/FilePartition.md#maxSplitBytes)
468-
469465
## <span id="filesOpenCostInBytes"> filesOpenCostInBytes
470466

471467
[spark.sql.files.openCostInBytes](configuration-properties.md#spark.sql.files.openCostInBytes)

docs/SparkSession.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -419,10 +419,12 @@ leafNodeDefaultParallelism: Int
419419

420420
`leafNodeDefaultParallelism` is the value of [spark.sql.leafNodeDefaultParallelism](configuration-properties.md#spark.sql.leafNodeDefaultParallelism) if defined or `SparkContext.defaultParallelism` ([Spark Core]({{ book.spark_core }}/SparkContext#defaultParallelism)).
421421

422+
---
423+
422424
`leafNodeDefaultParallelism` is used when:
423425

424-
* `SparkSession` is requested to [range](SparkSession.md#range)
426+
* [SparkSession.range](SparkSession.md#range) operator is used
425427
* `RangeExec` leaf physical operator is [created](physical-operators/RangeExec.md#numSlices)
426428
* `CommandResultExec` physical operator is requested for the `RDD[InternalRow]`
427429
* `LocalTableScanExec` physical operator is requested for the [RDD](physical-operators/LocalTableScanExec.md#rdd)
428-
* `FilePartition` utility is used to `maxSplitBytes`
430+
* `FilePartition` is requested for [maxSplitBytes](datasources/FilePartition.md#maxSplitBytes)

docs/configuration-properties.md

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ Used when:
137137

138138
**spark.sql.files.maxPartitionBytes**
139139

140-
Maximum number of bytes to pack into a single partition when reading files. Effective only for file-based sources (e.g., Parquet, JSON, ORC)
140+
Maximum number of bytes to pack into a single partition when reading files for file-based data sources (e.g., [Parquet](datasources/parquet/index.md))
141141

142142
Default: `128MB` (like `parquet.block.size`)
143143

@@ -147,6 +147,20 @@ Used when:
147147

148148
* `FilePartition` is requested for [maxSplitBytes](datasources/FilePartition.md#maxSplitBytes)
149149

150+
## <span id="spark.sql.files.minPartitionNum"><span id="FILES_MIN_PARTITION_NUM"> files.minPartitionNum
151+
152+
**spark.sql.files.minPartitionNum**
153+
154+
Hint about the minimum number of partitions for file-based data sources (e.g., [Parquet](datasources/parquet/index.md))
155+
156+
Default: [spark.sql.leafNodeDefaultParallelism](SparkSession.md#leafNodeDefaultParallelism)
157+
158+
Use [SQLConf.filesMinPartitionNum](SQLConf.md#filesMinPartitionNum) for the current value
159+
160+
Used when:
161+
162+
* `FilePartition` is requested for [maxSplitBytes](datasources/FilePartition.md#maxSplitBytes)
163+
150164
## <span id="spark.sql.files.openCostInBytes"><span id="FILES_OPEN_COST_IN_BYTES"> files.openCostInBytes
151165

152166
**spark.sql.files.openCostInBytes**
@@ -1278,14 +1292,6 @@ Default: `0`
12781292

12791293
Use [SQLConf.maxRecordsPerFile](SQLConf.md#maxRecordsPerFile) method to access the current value.
12801294

1281-
## <span id="spark.sql.files.minPartitionNum"> spark.sql.files.minPartitionNum
1282-
1283-
The suggested (not guaranteed) minimum number of split file partitions for file-based data sources such as Parquet, JSON and ORC.
1284-
1285-
Default: (undefined)
1286-
1287-
Use [SQLConf.filesMinPartitionNum](SQLConf.md#filesMinPartitionNum) method to access the current value.
1288-
12891295
## <span id="spark.sql.inMemoryColumnarStorage.compressed"> spark.sql.inMemoryColumnarStorage.compressed
12901296

12911297
When enabled, Spark SQL will automatically select a compression codec for each column based on statistics of the data.

docs/connector/Batch.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@ Used when:
2020

2121
* `BatchScanExec` is requested for a [PartitionReaderFactory](../physical-operators/BatchScanExec.md#readerFactory)
2222

23-
### <span id="planInputPartitions"> planInputPartitions
23+
### <span id="planInputPartitions"> Planning Input Partitions
2424

2525
```java
2626
InputPartition[] planInputPartitions()
2727
```
2828

29-
[InputPartition](InputPartition.md)s to scan this data source
29+
[InputPartition](InputPartition.md)s to scan this data source with
3030

3131
See:
3232

docs/datasources/FilePartition.md

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,22 @@ maxSplitBytes(
88
selectedPartitions: Seq[PartitionDirectory]): Long
99
```
1010

11-
`maxSplitBytes` reads the following properties:
11+
---
12+
13+
`maxSplitBytes` can be adjusted based on the following configuration properties:
1214

1315
* [spark.sql.files.maxPartitionBytes](../configuration-properties.md#spark.sql.files.maxPartitionBytes)
1416
* [spark.sql.files.openCostInBytes](../configuration-properties.md#spark.sql.files.openCostInBytes)
1517
* [spark.sql.files.minPartitionNum](../configuration-properties.md#spark.sql.files.minPartitionNum) (default: [Default Parallelism of Leaf Nodes](../SparkSession.md#leafNodeDefaultParallelism))
1618

17-
`maxSplitBytes` uses the given `selectedPartitions` to calculate `totalBytes` based on the size of the files with [spark.sql.files.openCostInBytes](../configuration-properties.md#spark.sql.files.openCostInBytes) added (for each file).
19+
---
20+
21+
`maxSplitBytes` calculates the total size of all the files (in the given `PartitionDirectory`ies) with [spark.sql.files.openCostInBytes](../configuration-properties.md#spark.sql.files.openCostInBytes) overhead added (to the size of every file).
22+
23+
??? note "PartitionDirectory"
24+
`PartitionDirectory` is a collection of `FileStatus`es ([Apache Hadoop]({{ hadoop.api }}/org/apache/hadoop/fs/FileStatus.html)) along with partition values (if there are any).
1825

19-
`maxSplitBytes` calculates `bytesPerCore` to be `totalBytes` divided by [filesMinPartitionNum](../SQLConf.md#filesMinPartitionNum).
26+
`maxSplitBytes` calculates how many bytes to allow per partition (`bytesPerCore`) that is the total size of all the files divided by [spark.sql.files.minPartitionNum](../configuration-properties.md#spark.sql.files.minPartitionNum) configuration property.
2027

2128
In the end, `maxSplitBytes` is [spark.sql.files.maxPartitionBytes](../configuration-properties.md#spark.sql.files.maxPartitionBytes) unless
2229
the maximum of [spark.sql.files.openCostInBytes](../configuration-properties.md#spark.sql.files.openCostInBytes) and `bytesPerCore` is even smaller.
@@ -25,5 +32,5 @@ the maximum of [spark.sql.files.openCostInBytes](../configuration-properties.md#
2532

2633
`maxSplitBytes` is used when:
2734

28-
* `FileSourceScanExec` physical operator is requested to [createReadRDD](../physical-operators/FileSourceScanExec.md#createReadRDD) (and creates a [FileScanRDD](../rdds/FileScanRDD.md))
35+
* `FileSourceScanExec` physical operator is requested to [create an RDD for scanning](../physical-operators/FileSourceScanExec.md#createReadRDD) (and creates a [FileScanRDD](../rdds/FileScanRDD.md))
2936
* `FileScan` is requested for [partitions](FileScan.md#partitions)

docs/features/index.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
# Features
22

3-
The following are the features of Spark SQL that help place it in the top of the modern SQL execution engines:
3+
The following are the features of Spark SQL that help place it in the top of the modern distributed SQL query processing engines:
44

55
* [Adaptive Query Execution](../adaptive-query-execution/index.md)
66
* [Catalog Plugin API](../connector/catalog/index.md)
77
* [Columnar Execution](../columnar-execution/index.md)
88
* [Dynamic Partition Pruning](../dynamic-partition-pruning/index.md)
9+
* [File-Based Data Scanning](../file-based-data-scanning/index.md)
910
* [Variable Substitution](../variable-substitution.md)
1011
* [Whole-Stage Code Generation](../whole-stage-code-generation/index.md)
11-
* _many others_ (listed in the menu on the left)
12+
* _others_ (listed in the menu on the left)
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# File-Based Data Scanning
2+
3+
Spark SQL uses [FileScanRDD](../rdds/FileScanRDD.md) for table scans of File-Based Data Sources (e.g., [parquet](../datasources/parquet/index.md)).
4+
5+
The number of partitions in data scanning is based on the following:
6+
7+
* [maxSplitBytes hint](../datasources/FilePartition.md#maxSplitBytes)
8+
* [Whether FileFormat is splitable or not](../datasources/FileFormat.md#isSplitable)
9+
* [Number of split files](../datasources/PartitionedFileUtil.md#splitFiles)
10+
* Bucket Pruning
11+
12+
File-Based Data Scanning can be [bucketed or not](../physical-operators/FileSourceScanExec.md#bucketedScan).

docs/physical-operators/FileSourceScanExec.md

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -121,29 +121,35 @@ createReadRDD(
121121
fsRelation: HadoopFsRelation): RDD[InternalRow]
122122
```
123123

124-
!!! note "FIXME: Review Me"
124+
`createReadRDD` prints out the following INFO message to the logs (with [maxSplitBytes](../datasources/FilePartition.md#maxSplitBytes) hint and [openCostInBytes](../configuration-properties.md#spark.sql.files.openCostInBytes)):
125125

126-
`createReadRDD` calculates the maximum size of partitions (`maxSplitBytes`) based on the following properties:
127-
128-
* [spark.sql.files.maxPartitionBytes](../configuration-properties.md#spark.sql.files.maxPartitionBytes)
126+
```text
127+
Planning scan with bin packing, max size: [maxSplitBytes] bytes,
128+
open cost is considered as scanning [openCostInBytes] bytes.
129+
```
129130

130-
* [spark.sql.files.openCostInBytes](../configuration-properties.md#spark.sql.files.openCostInBytes)
131+
`createReadRDD` determines whether [Bucketing](../bucketing.md) is enabled or not (based on [spark.sql.sources.bucketing.enabled](../configuration-properties.md#spark.sql.sources.bucketing.enabled)) for bucket pruning.
131132

132-
`createReadRDD` sums up the size of all the files (with the extra [spark.sql.files.openCostInBytes](../configuration-properties.md#spark.sql.files.openCostInBytes)) for the given `selectedPartitions` and divides the sum by the "default parallelism" (i.e. number of CPU cores assigned to a Spark application) that gives `bytesPerCore`.
133+
??? note "Bucket Pruning"
134+
**Bucket Pruning** is an optimization to filter out data files from scanning (based on [optionalBucketSet](#optionalBucketSet)).
133135

134-
The maximum size of partitions is then the minimum of [spark.sql.files.maxPartitionBytes](../configuration-properties.md#spark.sql.files.maxPartitionBytes) and the bigger of [spark.sql.files.openCostInBytes](../configuration-properties.md#spark.sql.files.openCostInBytes) and the `bytesPerCore`.
136+
With [Bucketing](../bucketing.md) disabled or [optionalBucketSet](#optionalBucketSet) undefined, all files are included in scanning.
135137

136-
`createReadRDD` prints out the following INFO message to the logs:
138+
`createReadRDD` [splits files](../datasources/PartitionedFileUtil.md#splitFiles) to be scanned (in the given `selectedPartitions`), possibly applying bucket pruning (with [Bucketing](../bucketing.md) enabled). `createReadRDD` uses the following:
137139

138-
```text
139-
Planning scan with bin packing, max size: [maxSplitBytes] bytes, open cost is considered as scanning [openCostInBytes] bytes.
140-
```
140+
* [isSplitable](../datasources/FileFormat.md#isSplitable) property of the [FileFormat](../datasources/FileFormat.md) of the [HadoopFsRelation](#relation)
141+
* [maxSplitBytes](../datasources/FilePartition.md#maxSplitBytes) hint
141142

142-
For every file (as Hadoop's `FileStatus`) in every partition (as `PartitionDirectory` in the given `selectedPartitions`), `createReadRDD` [gets the HDFS block locations](#getBlockLocations) to create [PartitionedFiles](../datasources/PartitionedFile.md) (possibly split per the maximum size of partitions if the [FileFormat](../datasources/HadoopFsRelation.md#fileFormat) of the [HadoopFsRelation](#fsRelation) is [splittable](../datasources/FileFormat.md#isSplitable)). The partitioned files are then sorted by number of bytes to read (aka _split size_) in decreasing order (from the largest to the smallest).
143+
`createReadRDD` sorts the split files (by length in reverse order).
143144

144-
`createReadRDD` "compresses" multiple splits per partition if together they are smaller than the `maxSplitBytes` ("Next Fit Decreasing") that gives the necessary partitions (file blocks as [FilePartitions](../rdds/FileScanRDD.md#FilePartition)).
145+
In the end, creates a [FileScanRDD](../rdds/FileScanRDD.md) with the following:
145146

146-
In the end, `createReadRDD` creates a [FileScanRDD](../rdds/FileScanRDD.md) (with the given `(PartitionedFile) => Iterator[InternalRow]` read function and the partitions).
147+
Property | Value
148+
---------|------
149+
[readFunction](../rdds/FileScanRDD.md#readFunction) | Input `readFile` function
150+
[filePartitions](../rdds/FileScanRDD.md#filePartitions) | [Partitions](../datasources/FilePartition.md#getFilePartitions)
151+
[readSchema](../rdds/FileScanRDD.md#readSchema) | [requiredSchema](#requiredSchema) with [partitionSchema](../datasources/HadoopFsRelation.md#partitionSchema) of the input [HadoopFsRelation](../datasources/HadoopFsRelation.md)
152+
[metadataColumns](../rdds/FileScanRDD.md#metadataColumns) | [metadataColumns](#metadataColumns)
147153

148154
### <span id="dynamicallySelectedPartitions"> Dynamically Selected Partitions
149155

docs/rdds/FileScanRDD.md

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,22 @@
22

33
`FileScanRDD` is the [input RDD](../physical-operators/FileSourceScanExec.md#inputRDD) of [FileSourceScanExec](../physical-operators/FileSourceScanExec.md) leaf physical operator (for [Whole-Stage Java Code Generation](../whole-stage-code-generation/index.md)).
44

5-
!!! note "The Internals of Apache Spark"
6-
Find out more on `RDD` abstraction in [The Internals of Apache Spark]({{ book.spark_core }}/rdd/RDD.html).
5+
??? note "RDD"
6+
Find out more on `RDD` abstraction in [The Internals of Apache Spark]({{ book.spark_core }}/rdd/RDD).
77

88
## Creating Instance
99

1010
`FileScanRDD` takes the following to be created:
1111

1212
* <span id="sparkSession"> [SparkSession](../SparkSession.md)
13-
* <span id="readFunction"> Read Function that takes a [PartitionedFile](../datasources/PartitionedFile.md) and gives [internal binary rows](../InternalRow.md) back (`(PartitionedFile) => Iterator[InternalRow]`)
14-
* <span id="filePartitions"> File Blocks as `FilePartition`s (`Seq[FilePartition]`)
13+
* <span id="readFunction"> Read Function of [PartitionedFile](../datasources/PartitionedFile.md)s to [InternalRow](../InternalRow.md)s (`(PartitionedFile) => Iterator[InternalRow]`)
14+
* <span id="filePartitions"> [FilePartition](../datasources/FilePartition.md)s
15+
* <span id="readSchema"> Read [Schema](../types/StructType.md)
16+
* <span id="metadataColumns"> Metadata Columns
1517

16-
`FileScanRDD` is created when [FileSourceScanExec](../physical-operators/FileSourceScanExec.md) physical operator is requested to [createBucketedReadRDD](../physical-operators/FileSourceScanExec.md#createBucketedReadRDD) and [createNonBucketedReadRDD](../physical-operators/FileSourceScanExec.md#createNonBucketedReadRDD) (when `FileSourceScanExec` operator is requested for the [input RDD](../physical-operators/FileSourceScanExec.md#inputRDD) when [WholeStageCodegenExec](../physical-operators/WholeStageCodegenExec.md) physical operator is executed).
18+
`FileScanRDD` is created when:
19+
20+
* [FileSourceScanExec](../physical-operators/FileSourceScanExec.md) physical operator is requested to [createBucketedReadRDD](../physical-operators/FileSourceScanExec.md#createBucketedReadRDD) and [createNonBucketedReadRDD](../physical-operators/FileSourceScanExec.md#createNonBucketedReadRDD) (when `FileSourceScanExec` operator is requested for the [input RDD](../physical-operators/FileSourceScanExec.md#inputRDD) when [WholeStageCodegenExec](../physical-operators/WholeStageCodegenExec.md) physical operator is executed)
1721

1822
## Configuration Properties
1923

mkdocs.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,8 @@ nav:
174174
- EstimationUtils: cost-based-optimization/EstimationUtils.md
175175
- Dynamic Partition Pruning:
176176
- dynamic-partition-pruning/index.md
177+
- File-Based Data Scanning:
178+
- file-based-data-scanning/index.md
177179
- Join Queries:
178180
- Joins: joins.md
179181
- Broadcast Joins: spark-sql-joins-broadcast.md
@@ -811,6 +813,8 @@ nav:
811813
- UnsafeHashedRelation: UnsafeHashedRelation.md
812814
- UnsafeRow: UnsafeRow.md
813815
- UnsafeRowSerializerInstance: tungsten/UnsafeRowSerializerInstance.md
816+
- RDDs:
817+
- FileScanRDD: rdds/FileScanRDD.md
814818
- SQL:
815819
- sql/index.md
816820
- AbstractSqlParser: sql/AbstractSqlParser.md
@@ -1059,7 +1063,6 @@ nav:
10591063
- Caching and Persistence: caching-and-persistence.md
10601064
- User-Friendly Names of Cached Queries in web UI: caching-webui-storage.md
10611065
- Checkpointing: checkpointing.md
1062-
- FileScanRDD: rdds/FileScanRDD.md
10631066
- Logging: spark-logging.md
10641067
- Performance Tuning and Debugging:
10651068
- Debugging Query Execution: debugging-query-execution.md

0 commit comments

Comments
 (0)