Skip to content

Commit e460e1c

Browse files
PairRDDFunctions + SparkHadoopWriter
1 parent 9e1ff77 commit e460e1c

File tree

2 files changed

+123
-268
lines changed

2 files changed

+123
-268
lines changed

docs/SparkHadoopWriter.md

Lines changed: 46 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,77 +1,66 @@
1-
# SparkHadoopWriter
1+
# SparkHadoopWriter Utility
22

3-
`SparkHadoopWriter` utility is used to <<write, write a key-value RDD (as a Hadoop OutputFormat)>>.
3+
## <span id="write"> Writing Key-Value RDD Out (As Hadoop OutputFormat)
44

5-
`SparkHadoopWriter` utility is used by rdd:PairRDDFunctions.md#saveAsNewAPIHadoopDataset[saveAsNewAPIHadoopDataset] and rdd:PairRDDFunctions.md#saveAsHadoopDataset[saveAsHadoopDataset] transformations.
6-
7-
[[logging]]
8-
[TIP]
9-
====
10-
Enable `ALL` logging level for `org.apache.spark.internal.io.SparkHadoopWriter` logger to see what happens inside.
11-
12-
Add the following line to `conf/log4j.properties`:
13-
14-
```
15-
log4j.logger.org.apache.spark.internal.io.SparkHadoopWriter=ALL
16-
```
17-
18-
Refer to <<spark-logging.md#, Logging>>.
19-
====
20-
21-
== [[write]] Writing Key-Value RDD Out (As Hadoop OutputFormat) -- `write` Utility
22-
23-
[source, scala]
24-
----
5+
```scala
256
write[K, V: ClassTag](
267
rdd: RDD[(K, V)],
278
config: HadoopWriteConfigUtil[K, V]): Unit
28-
----
9+
```
10+
11+
!!! FIXME
12+
Review Me
2913

30-
[[write-commitJobId]]
14+
<span id="write-commitJobId">
3115
`write` uses the id of the given RDD as the `commitJobId`.
3216

33-
[[write-jobTrackerId]]
17+
<span id="write-jobTrackerId">
3418
`write` creates a `jobTrackerId` with the current date.
3519

36-
[[write-jobContext]]
37-
`write` requests the given `HadoopWriteConfigUtil` to <<HadoopWriteConfigUtil.md#createJobContext, create a Hadoop JobContext>> (for the <<write-jobTrackerId, jobTrackerId>> and <<write-commitJobId, commitJobId>>).
20+
<span id="write-jobContext">
21+
`write` requests the given `HadoopWriteConfigUtil` to [create a Hadoop JobContext](HadoopWriteConfigUtil.md#createJobContext) (for the [jobTrackerId](#write-jobTrackerId) and [commitJobId](#write-commitJobId)).
3822

39-
`write` requests the given `HadoopWriteConfigUtil` to <<HadoopWriteConfigUtil.md#initOutputFormat, initOutputFormat>> with the Hadoop https://hadoop.apache.org/docs/r2.7.3/api/org/apache/hadoop/mapreduce/JobContext.html[JobContext].
23+
`write` requests the given `HadoopWriteConfigUtil` to [initOutputFormat](HadoopWriteConfigUtil.md#initOutputFormat) with the Hadoop [JobContext]({{ hadoop.api }}/api/org/apache/hadoop/mapreduce/JobContext.html).
4024

41-
`write` requests the given `HadoopWriteConfigUtil` to <<HadoopWriteConfigUtil.md#assertConf, assertConf>>.
25+
`write` requests the given `HadoopWriteConfigUtil` to [assertConf](HadoopWriteConfigUtil.md#assertConf).
4226

43-
`write` requests the given `HadoopWriteConfigUtil` to <<HadoopWriteConfigUtil.md#createCommitter, create a HadoopMapReduceCommitProtocol committer>> for the <<write-commitJobId, commitJobId>>.
27+
`write` requests the given `HadoopWriteConfigUtil` to [create a HadoopMapReduceCommitProtocol committer](HadoopWriteConfigUtil.md#createCommitter) for the [commitJobId](#write-commitJobId).
4428

45-
`write` requests the `HadoopMapReduceCommitProtocol` to <<HadoopMapReduceCommitProtocol.md#setupJob, setupJob>> (with the <<write-jobContext, jobContext>>).
29+
`write` requests the `HadoopMapReduceCommitProtocol` to [setupJob](HadoopMapReduceCommitProtocol.md#setupJob) (with the [jobContext](#write-jobContext)).
4630

47-
[[write-runJob]][[write-executeTask]]
48-
`write` uses the `SparkContext` (of the given RDD) to SparkContext.md#runJob[run a Spark job asynchronously] for the given RDD with the <<executeTask, executeTask>> partition function.
31+
<span id="write-runJob"><span id="write-executeTask">
32+
`write` uses the `SparkContext` (of the given RDD) to [run a Spark job asynchronously](SparkContext.md#runJob) for the given RDD with the [executeTask](#executeTask) partition function.
4933

50-
[[write-commitJob]]
51-
In the end, `write` requests the <<write-committer, HadoopMapReduceCommitProtocol>> to <<HadoopMapReduceCommitProtocol.md#commitJob, commit the job>> and prints out the following INFO message to the logs:
34+
<span id="write-commitJob">
35+
In the end, `write` requests the [HadoopMapReduceCommitProtocol](#write-committer) to [commit the job](HadoopMapReduceCommitProtocol.md#commitJob) and prints out the following INFO message to the logs:
5236

53-
```
37+
```text
5438
Job [getJobID] committed.
5539
```
5640

57-
NOTE: `write` is used when `PairRDDFunctions` is requested to rdd:PairRDDFunctions.md#saveAsNewAPIHadoopDataset[saveAsNewAPIHadoopDataset] and rdd:PairRDDFunctions.md#saveAsHadoopDataset[saveAsHadoopDataset].
58-
59-
=== [[write-Throwable]] `write` Utility And Throwables
41+
### <span id="write-Throwable"> Throwables
6042

6143
In case of any `Throwable`, `write` prints out the following ERROR message to the logs:
6244

63-
```
45+
```text
6446
Aborting job [getJobID].
6547
```
6648

67-
[[write-abortJob]]
68-
`write` requests the <<write-committer, HadoopMapReduceCommitProtocol>> to <<HadoopMapReduceCommitProtocol.md#abortJob, abort the job>> and throws a `SparkException`:
49+
<span id="write-abortJob">
50+
`write` requests the [HadoopMapReduceCommitProtocol](#write-committer) to [abort the job](HadoopMapReduceCommitProtocol.md#abortJob) and throws a `SparkException`:
6951

7052
```text
7153
Job aborted.
7254
```
7355

74-
## <span id="executeTask"> Writing RDD Partition
56+
### <span id="write-usage"> Usage
57+
58+
`write` is used when:
59+
60+
* [PairRDDFunctions.saveAsNewAPIHadoopDataset](rdd/PairRDDFunctions.md#saveAsNewAPIHadoopDataset)
61+
* [PairRDDFunctions.saveAsHadoopDataset](rdd/PairRDDFunctions.md#saveAsHadoopDataset)
62+
63+
### <span id="executeTask"> Writing RDD Partition
7564

7665
```scala
7766
executeTask[K, V: ClassTag](
@@ -85,6 +74,9 @@ executeTask[K, V: ClassTag](
8574
iterator: Iterator[(K, V)]): TaskCommitMessage
8675
```
8776

77+
!!! FIXME
78+
Review Me
79+
8880
`executeTask` requests the given `HadoopWriteConfigUtil` to [create a TaskAttemptContext](HadoopWriteConfigUtil.md#createTaskAttemptContext).
8981

9082
`executeTask` requests the given `FileCommitProtocol` to [set up a task](FileCommitProtocol.md#setupTask) with the `TaskAttemptContext`.
@@ -103,16 +95,18 @@ In case of any errors, `executeTask` requests the given `HadoopWriteConfigUtil`
10395
Task [taskAttemptID] aborted.
10496
```
10597

106-
`executeTask` is used when `SparkHadoopWriter` utility is used to [write](#write).
98+
`executeTask` is used when:
10799

108-
== [[initHadoopOutputMetrics]] `initHadoopOutputMetrics` Utility
100+
* `SparkHadoopWriter` utility is used to [write](#write)
109101

110-
[source, scala]
111-
----
112-
initHadoopOutputMetrics(
113-
context: TaskContext): (OutputMetrics, () => Long)
114-
----
102+
## Logging
115103

116-
`initHadoopOutputMetrics`...FIXME
104+
Enable `ALL` logging level for `org.apache.spark.internal.io.SparkHadoopWriter` logger to see what happens inside.
105+
106+
Add the following line to `conf/log4j.properties`:
107+
108+
```text
109+
log4j.logger.org.apache.spark.internal.io.SparkHadoopWriter=ALL
110+
```
117111

118-
NOTE: `initHadoopOutputMetrics` is used when `SparkHadoopWriter` utility is used to <<executeTask, executeTask>>.
112+
Refer to [Logging](spark-logging.md).

0 commit comments

Comments
 (0)