Skip to content

Commit c2a952a

Browse files
[SDP] PipelineEventSender
1 parent 8cbcf8f commit c2a952a

File tree

4 files changed

+71
-1
lines changed

4 files changed

+71
-1
lines changed

docs/declarative-pipelines/.pages

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
title: Declarative Pipelines
22
nav:
33
- index.md
4+
- configuration-properties.md
45
- ...
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# PipelineEventSender
2+
3+
`PipelineEventSender` is used by [PipelinesHandler](PipelinesHandler.md) to [send pipelines execution progress events back to a Spark Connect client asynchronously](#sendEvent).
4+
5+
`PipelineEventSender` uses [spark.sql.pipelines.event.queue.capacity](./configuration-properties.md#PIPELINES_EVENT_QUEUE_CAPACITY) configuration property to control the depth of the event queue.
6+
7+
## Creating Instance
8+
9+
`PipelineEventSender` takes the following to be created:
10+
11+
* <span id="responseObserver"> `StreamObserver[ExecutePlanResponse]`
12+
* <span id="sessionHolder"> `SessionHolder`
13+
14+
`PipelineEventSender` is created when:
15+
16+
* `PipelinesHandler` is requested to [start a pipeline run](PipelinesHandler.md#startRun)
17+
18+
## queueCapacity { #queueCapacity }
19+
20+
`queueCapacity` is the value of [spark.sql.pipelines.event.queue.capacity](./configuration-properties.md#PIPELINES_EVENT_QUEUE_CAPACITY) configuration property.
21+
22+
Used when:
23+
24+
* `PipelineEventSender` is requested to [shouldEnqueueEvent](#shouldEnqueueEvent)
25+
26+
## Send Pipeline Execution Progress Event { #sendEvent }
27+
28+
```scala
29+
sendEvent(
30+
event: PipelineEvent): Unit
31+
```
32+
33+
`sendEvent`...FIXME
34+
35+
---
36+
37+
`sendEvent` is used when:
38+
39+
* `PipelinesHandler` is requested to [start a pipeline run](PipelinesHandler.md#startRun)
40+
41+
### shouldEnqueueEvent { #shouldEnqueueEvent }
42+
43+
```scala
44+
shouldEnqueueEvent(
45+
event: PipelineEvent): Boolean
46+
```
47+
48+
`shouldEnqueueEvent`...FIXME

docs/declarative-pipelines/PipelinesHandler.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
---
2+
subtitle: Spark Connect Endpoint
3+
---
4+
15
# PipelinesHandler
26

37
`PipelinesHandler` is used to [handle pipeline commands](#handlePipelinesCommand) in [Spark Connect]({{ book.spark_connect }}) ([SparkConnectPlanner]({{ book.spark_connect }}/server/SparkConnectPlanner), precisely).
@@ -87,7 +91,7 @@ startRun(
8791

8892
`startRun` finds the [GraphRegistrationContext](GraphRegistrationContext.md) by `dataflowGraphId` in the [DataflowGraphRegistry](DataflowGraphRegistry.md) (in the given `SessionHolder`).
8993

90-
`startRun` creates a `PipelineEventSender` to send pipeline events back to the Spark Connect client (_Python pipeline runtime_).
94+
`startRun` creates a [PipelineEventSender](PipelineEventSender.md) to [send pipeline execution progress events back to the Spark Connect client](PipelineEventSender.md#sendEvent) (_Python pipeline runtime_).
9195

9296
`startRun` creates a [PipelineUpdateContextImpl](PipelineUpdateContextImpl.md) (with the `PipelineEventSender`).
9397

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# Configuration Properties
2+
3+
**Configuration properties** (aka **settings**) for [Spark Declarative Pipelines](index.md).
4+
5+
## <span id="PIPELINES_EVENT_QUEUE_CAPACITY"> event.queue.capacity { #spark.sql.pipelines.event.queue.capacity }
6+
7+
**spark.sql.pipelines.event.queue.capacity**
8+
9+
Capacity of the event queue of a pipelined execution. When the queue is full, non-terminal `PipelineEvent`s will be dropped.
10+
11+
Default: `1000`
12+
13+
Use [SQLConf.PIPELINES_EVENT_QUEUE_CAPACITY](../SQLConf.md#PIPELINES_EVENT_QUEUE_CAPACITY) to reference it
14+
15+
Used when:
16+
17+
* `PipelineEventSender` is [created](PipelineEventSender.md#queueCapacity)

0 commit comments

Comments
 (0)