Skip to content

Commit 11b2cc2

Browse files
ColumnarBatch, ParquetPartitionReaderFactory and spark.sql.columnVector.offheap.enabled
1 parent 4c9cffb commit 11b2cc2

File tree

7 files changed

+77
-21
lines changed

7 files changed

+77
-21
lines changed

docs/SQLConf.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -665,7 +665,11 @@ Used when:
665665

666666
## <span id="numShufflePartitions"><span id="SHUFFLE_PARTITIONS"> numShufflePartitions
667667

668-
The value of [spark.sql.shuffle.partitions](configuration-properties.md#spark.sql.shuffle.partitions) configuration property or...FIXME
668+
[spark.sql.shuffle.partitions](configuration-properties.md#spark.sql.shuffle.partitions)
669+
670+
## <span id="COLUMN_VECTOR_OFFHEAP_ENABLED"><span id="offHeapColumnVectorEnabled"> offHeapColumnVectorEnabled
671+
672+
[spark.sql.columnVector.offheap.enabled](configuration-properties.md#spark.sql.columnVector.offheap.enabled)
669673

670674
## <span id="rangeExchangeSampleSizePerPartition"><span id="RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION"> rangeExchangeSampleSizePerPartition
671675

docs/configuration-properties.md

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,23 @@ Default: `true`
116116

117117
Use [SQLConf.ENABLE_FULL_OUTER_SHUFFLED_HASH_JOIN_CODEGEN](SQLConf.md#ENABLE_FULL_OUTER_SHUFFLED_HASH_JOIN_CODEGEN) to access the property
118118

119+
## <span id="spark.sql.columnVector.offheap.enabled"> columnVector.offheap.enabled
120+
121+
**spark.sql.columnVector.offheap.enabled**
122+
123+
**(internal)** Enables [OffHeapColumnVector](OffHeapColumnVector.md) (`true`) or [OnHeapColumnVector](OnHeapColumnVector.md) (`false`) in [ColumnarBatch](vectorized-query-execution/ColumnarBatch.md)
124+
125+
Default: `false`
126+
127+
Use [SQLConf.offHeapColumnVectorEnabled](SQLConf.md#offHeapColumnVectorEnabled) for the current value
128+
129+
Used when:
130+
131+
* `RowToColumnarExec` is requested to `doExecuteColumnar`
132+
* `DefaultCachedBatchSerializer` is requested to `vectorTypes` and `convertCachedBatchToColumnarBatch`
133+
* `ParquetFileFormat` is requested to [vectorTypes](datasources/parquet/ParquetFileFormat.md#vectorTypes) and [buildReaderWithPartitionValues](datasources/parquet/ParquetFileFormat.md#buildReaderWithPartitionValues)
134+
* `ParquetPartitionReaderFactory` is [created](datasources/parquet/ParquetPartitionReaderFactory.md#enableOffHeapColumnVector)
135+
119136
## <span id="spark.sql.files.maxPartitionBytes"><span id="FILES_MAX_PARTITION_BYTES"> files.maxPartitionBytes
120137

121138
**spark.sql.files.maxPartitionBytes**
@@ -1187,14 +1204,6 @@ Default: `true`
11871204

11881205
Use [SQLConf.wholeStageSplitConsumeFuncByOperator](SQLConf.md#wholeStageSplitConsumeFuncByOperator) method to access the current value.
11891206

1190-
## <span id="spark.sql.columnVector.offheap.enabled"> spark.sql.columnVector.offheap.enabled
1191-
1192-
**(internal)** Enables [OffHeapColumnVector](OffHeapColumnVector.md) in [ColumnarBatch](vectorized-query-execution/ColumnarBatch.md) (`true`) or not (`false`). When `false`, [OnHeapColumnVector](OnHeapColumnVector.md) is used instead.
1193-
1194-
Default: `false`
1195-
1196-
Use [SQLConf.offHeapColumnVectorEnabled](SQLConf.md#offHeapColumnVectorEnabled) method to access the current value.
1197-
11981207
## <span id="spark.sql.columnNameOfCorruptRecord"> spark.sql.columnNameOfCorruptRecord
11991208

12001209
## <span id="spark.sql.constraintPropagation.enabled"> spark.sql.constraintPropagation.enabled

docs/connector/Batch.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
## Contract
66

7-
### <span id="createReaderFactory"> createReaderFactory
7+
### <span id="createReaderFactory"> Creating PartitionReaderFactory
88

99
```java
1010
PartitionReaderFactory createReaderFactory()

docs/datasources/parquet/ParquetPartitionReaderFactory.md

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# ParquetPartitionReaderFactory
22

3-
`ParquetPartitionReaderFactory` is a [FilePartitionReaderFactory](../FilePartitionReaderFactory.md).
3+
`ParquetPartitionReaderFactory` is a [FilePartitionReaderFactory](../FilePartitionReaderFactory.md) for [ParquetScan](ParquetScan.md#createReaderFactory) for batch queries.
44

55
## Creating Instance
66

@@ -18,6 +18,13 @@
1818

1919
* `ParquetScan` is requested to [create a PartitionReaderFactory](ParquetScan.md#createReaderFactory)
2020

21+
## <span id="enableOffHeapColumnVector"><span id="spark.sql.columnVector.offheap.enabled"> columnVector.offheap.enabled
22+
23+
`ParquetPartitionReaderFactory` uses [spark.sql.columnVector.offheap.enabled](../../configuration-properties.md#spark.sql.columnVector.offheap.enabled) configuration property when requested for the following:
24+
25+
* [Create a Vectorized Reader](#createParquetVectorizedReader) (and create a [VectorizedParquetRecordReader](VectorizedParquetRecordReader.md#useOffHeap))
26+
* [Build a Columnar Reader](#buildColumnarReader) (and `convertAggregatesRowToBatch`)
27+
2128
## <span id="supportColumnarReads"> supportColumnarReads
2229

2330
```scala
@@ -51,7 +58,7 @@ buildColumnarReader(
5158

5259
In the end, `buildColumnarReader` returns a [PartitionReader](../../connector/PartitionReader.md) that returns [ColumnarBatch](../../vectorized-query-execution/ColumnarBatch.md)es (when [requested for records](../../connector/PartitionReader.md#get)).
5360

54-
## <span id="buildReader"> Building PartitionReader
61+
## <span id="buildReader"> Building Partition Reader
5562

5663
```scala
5764
buildReader(
@@ -95,9 +102,38 @@ createVectorizedReader(
95102

96103
In the end, `createVectorizedReader` requests the [VectorizedParquetRecordReader](VectorizedParquetRecordReader.md) to [initBatch](VectorizedParquetRecordReader.md#initBatch) (with the [partitionSchema](#partitionSchema) and the [partitionValues](../PartitionedFile.md#partitionValues) of the given [PartitionedFile](../PartitionedFile.md)) and returns it.
97104

98-
`createVectorizedReader` is used when:
105+
---
106+
107+
`createVectorizedReader` is used when `ParquetPartitionReaderFactory` is requested for the following:
108+
109+
* [Build a partition reader (for a file)](#buildReader) (with [enableVectorizedReader](#enableVectorizedReader) enabled)
110+
* [Build a columnar partition reader (for a file)](#buildColumnarReader)
111+
112+
### <span id="createParquetVectorizedReader"> createParquetVectorizedReader
113+
114+
```scala
115+
createParquetVectorizedReader(
116+
partitionValues: InternalRow,
117+
pushed: Option[FilterPredicate],
118+
convertTz: Option[ZoneId],
119+
datetimeRebaseSpec: RebaseSpec,
120+
int96RebaseSpec: RebaseSpec): VectorizedParquetRecordReader
121+
```
122+
123+
`createParquetVectorizedReader` creates a [VectorizedParquetRecordReader](VectorizedParquetRecordReader.md) (with [capacity](#capacity)).
124+
125+
`createParquetVectorizedReader` creates a [RecordReaderIterator](../RecordReaderIterator.md) (for the `VectorizedParquetRecordReader`).
99126

100-
* `ParquetPartitionReaderFactory` is requested to [buildReader](#buildReader) and [buildColumnarReader](#buildColumnarReader)
127+
`createParquetVectorizedReader` prints out the following DEBUG message to the logs (with the [partitionSchema](#partitionSchema) and the given `partitionValues`):
128+
129+
```text
130+
Appending [partitionSchema] [partitionValues]
131+
```
132+
133+
In the end, `createParquetVectorizedReader` returns the `VectorizedParquetRecordReader`.
134+
135+
??? note "Unused RecordReaderIterator?"
136+
It appears that the `RecordReaderIterator` is created but not used. _Feeling confused_.
101137

102138
## <span id="buildReaderBase"> buildReaderBase
103139

@@ -116,6 +152,8 @@ buildReaderBase[T](
116152

117153
`buildReaderBase`...FIXME
118154

155+
---
156+
119157
`buildReaderBase` is used when:
120158

121159
* `ParquetPartitionReaderFactory` is requested to [createRowBaseReader](#createRowBaseReader) and [createVectorizedReader](#createVectorizedReader)

docs/datasources/parquet/ParquetScan.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
* `ParquetScanBuilder` is requested to [build a Scan](ParquetScanBuilder.md#build)
2323

24-
## <span id="createReaderFactory"> createReaderFactory
24+
## <span id="createReaderFactory"> Creating PartitionReaderFactory
2525

2626
```scala
2727
createReaderFactory(): PartitionReaderFactory

docs/datasources/parquet/VectorizedParquetRecordReader.md

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,21 +62,24 @@ void initBatch(
6262

6363
`initBatch` creates a [batch schema](../../types/index.md) that is [sparkSchema](SpecificParquetRecordReaderBase.md#sparkSchema) and the input `partitionColumns` schema (if available).
6464

65-
`initBatch` requests [OffHeapColumnVector](../../OffHeapColumnVector.md#allocateColumns) or [OnHeapColumnVector](../../OnHeapColumnVector.md#allocateColumns) to allocate column vectors per the input `memMode`, i.e. [OFF_HEAP](#OFF_HEAP) or [ON_HEAP](#ON_HEAP) memory modes, respectively. `initBatch` records the allocated column vectors as the internal [WritableColumnVectors](#columnVectors).
65+
`initBatch` requests [OffHeapColumnVector](../../OffHeapColumnVector.md#allocateColumns) or [OnHeapColumnVector](../../OnHeapColumnVector.md#allocateColumns) to allocate column vectors per the input `memMode` (i.e., [OFF_HEAP](#OFF_HEAP) or [ON_HEAP](#ON_HEAP) memory modes, respectively). `initBatch` records the allocated column vectors as the internal [WritableColumnVectors](#columnVectors).
6666

67-
!!! note
67+
!!! note "spark.sql.columnVector.offheap.enabled"
6868
[OnHeapColumnVector](../../OnHeapColumnVector.md) is used based on [spark.sql.columnVector.offheap.enabled](../../configuration-properties.md#spark.sql.columnVector.offheap.enabled) configuration property.
6969

70-
`initBatch` creates a [ColumnarBatch](../../vectorized-query-execution/ColumnarBatch.md) (with the [allocated WritableColumnVectors](#columnVectors)) and records it as the internal [ColumnarBatch](#columnarBatch).
70+
`initBatch` creates a [ColumnarBatch](#columnarBatch) (with the [allocated WritableColumnVectors](#columnVectors)).
7171

72-
`initBatch` does some additional maintenance to the [columnVectors](#columnVectors).
72+
`initBatch` does some additional maintenance to the [WritableColumnVectors](#columnVectors).
73+
74+
---
7375

7476
`initBatch` is used when:
7577

7678
* `VectorizedParquetRecordReader` is requested to [resultBatch](#resultBatch)
7779
* `ParquetFileFormat` is requested to [build a data reader (with partition column values appended)](ParquetFileFormat.md#buildReaderWithPartitionValues)
7880
* `ParquetPartitionReaderFactory` is requested to [createVectorizedReader](ParquetPartitionReaderFactory.md#createVectorizedReader)
7981

82+
<!---
8083
## Review Me
8184
8285
`VectorizedParquetRecordReader` uses <<OFF_HEAP, OFF_HEAP>> memory mode when [spark.sql.columnVector.offheap.enabled](../../configuration-properties.md#spark.sql.columnVector.offheap.enabled) internal configuration property is enabled (`true`).
@@ -212,3 +215,4 @@ NOTE: `getCurrentValue` is part of the Hadoop https://hadoop.apache.org/docs/r2.
212215
* `NewHadoopRDD` is requested to compute a partition (`compute`)
213216
214217
* `RecordReaderIterator` is requested for the [next internal row](../RecordReaderIterator.md#next)
218+
-->

docs/vectorized-query-execution/ColumnarBatch.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@ tags:
2020

2121
`ColumnarBatch` is created when:
2222

23+
* `ArrowConverters` utility is requested to `fromBatchIterator`
2324
* `RowToColumnarExec` unary physical operator is requested to `doExecuteColumnar`
2425
* [InMemoryTableScanExec](../physical-operators/InMemoryTableScanExec.md) leaf physical operator is requested for a [RDD[ColumnarBatch]](../physical-operators/InMemoryTableScanExec.md#columnarInputRDD)
2526
* `MapInPandasExec` unary physical operator is requested to `doExecute`
26-
* `OrcColumnarBatchReader` and `VectorizedParquetRecordReader` are requested to `initBatch`
27+
* `OrcColumnarBatchReader` is requested to `initBatch`
2728
* `PandasGroupUtils` utility is requested to `executePython`
28-
* `ArrowConverters` utility is requested to `fromBatchIterator`
29+
* `VectorizedParquetRecordReader` is requested to [init a batch](../datasources/parquet/VectorizedParquetRecordReader.md#initBatch)
2930

3031
## <span id="row"> ColumnarBatchRow
3132

0 commit comments

Comments
 (0)