Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 78 additions & 50 deletions python/docs/source/tutorial/sql/python_data_source.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@ Overview
The Python Data Source API is a new feature introduced in Spark 4.0, enabling developers to read from custom data sources and write to custom data sinks in Python.
This guide provides a comprehensive overview of the API and instructions on how to create, use, and manage Python data sources.

Simple Example
--------------
Simple Example: Data Source with Batch Reader
---------------------------------------------
Here's a simple Python data source that generates exactly two rows of synthetic data.
This example demonstrates how to set up a custom data source without using external libraries, focusing on the essentials needed to get it up and running quickly.

**Step 1: Define the data source**

.. code-block:: python
from typing import Iterator, Tuple

from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition
from pyspark.sql.types import IntegerType, StringType, StructField, StructType

class SimpleDataSource(DataSource):
Expand All @@ -44,21 +45,21 @@ This example demonstrates how to set up a custom data source without using exter
"""

@classmethod
def name(cls):
def name(cls) -> str:
return "simple"

def schema(self):
def schema(self) -> StructType:
return StructType([
StructField("name", StringType()),
StructField("age", IntegerType())
])

def reader(self, schema: StructType):
def reader(self, schema: StructType) -> DataSourceReader:
return SimpleDataSourceReader()

class SimpleDataSourceReader(DataSourceReader):

def read(self, partition):
def read(self, partition: InputPartition) -> Iterator[Tuple]:
yield ("Alice", 20)
yield ("Bob", 30)

Expand Down Expand Up @@ -86,17 +87,18 @@ This example demonstrates how to set up a custom data source without using exter
# +-----+---+


Creating a Python Data Source
-----------------------------
To create a custom Python data source, you'll need to subclass the :class:`DataSource` base classes and implement the necessary methods for reading and writing data.
Comprehensive Example: Data Source with Batch and Streaming Readers and Writers
-------------------------------------------------------------------------------
To create a custom Python data source, you'll need to subclass the :class:`DataSource` base class and implement the necessary methods for reading and writing data.

This example demonstrates creating a simple data source to generate synthetic data using the `faker` library. Ensure the `faker` library is installed and accessible in your Python environment.

**Define the Data Source**
Define the Data Source
~~~~~~~~~~~~~~~~~~~~~~

Start by creating a new subclass of :class:`DataSource` with the source name, schema.
Start by creating a new subclass of :class:`DataSource` with the source name and schema.

In order to be used as source or sink in batch or streaming query, corresponding method of DataSource needs to be implemented.
In order to be used as source or sink in batch or streaming query, corresponding methods of DataSource needs to be implemented.

Method that needs to be implemented for a capability:

Expand All @@ -111,8 +113,15 @@ Method that needs to be implemented for a capability:
+------------+----------------------+------------------+

.. code-block:: python

from pyspark.sql.datasource import DataSource, DataSourceReader
from typing import Union

from pyspark.sql.datasource import (
DataSource,
DataSourceReader,
DataSourceStreamReader,
DataSourceStreamWriter,
DataSourceWriter
)
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
Expand All @@ -123,39 +132,35 @@ Method that needs to be implemented for a capability:
"""

@classmethod
def name(cls):
def name(cls) -> str:
return "fake"

def schema(self):
def schema(self) -> Union[StructType, str]:
return "name string, date string, zipcode string, state string"

def reader(self, schema: StructType):
def reader(self, schema: StructType) -> DataSourceReader:
return FakeDataSourceReader(schema, self.options)

def writer(self, schema: StructType, overwrite: bool):
def writer(self, schema: StructType, overwrite: bool) -> DataSourceWriter:
return FakeDataSourceWriter(self.options)

def streamReader(self, schema: StructType):
def streamReader(self, schema: StructType) -> DataSourceStreamReader:
return FakeStreamReader(schema, self.options)

# Please skip the implementation of this method if streamReader has been implemented.
def simpleStreamReader(self, schema: StructType):
return SimpleStreamReader()

def streamWriter(self, schema: StructType, overwrite: bool):
def streamWriter(self, schema: StructType, overwrite: bool) -> DataSourceStreamWriter:
return FakeStreamWriter(self.options)

Implementing Batch Reader and Writer for Python Data Source
-----------------------------------------------------------
**Implement the Reader**
Implement a Batch Reader
~~~~~~~~~~~~~~~~~~~~~~~~

Define the reader logic to generate synthetic data. Use the `faker` library to populate each field in the schema.

.. code-block:: python
from typing import Dict

class FakeDataSourceReader(DataSourceReader):

def __init__(self, schema, options):
def __init__(self, schema: StructType, options: Dict[str, str]):
self.schema: StructType = schema
self.options = options

Expand All @@ -171,10 +176,10 @@ Define the reader logic to generate synthetic data. Use the `faker` library to p
row.append(value)
yield tuple(row)

**Implement the Writer**
Implement a Batch Writer
~~~~~~~~~~~~~~~~~~~~~~~~

Create a fake data source writer that processes each partition of data, counts the rows, and either
prints the total count of rows after a successful write or the number of failed tasks if the writing process fails.
Create a fake data source writer that processes each partition of data, counts the rows, and either prints the total count of rows after a successful write or the number of failed tasks if the writing process fails.

.. code-block:: python

Expand Down Expand Up @@ -208,16 +213,15 @@ prints the total count of rows after a successful write or the number of failed
print(f"Number of failed tasks: {failed_count}")


Implementing Streaming Reader and Writer for Python Data Source
---------------------------------------------------------------
**Implement the Stream Reader**
Implement a Streaming Reader
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

This is a dummy streaming data reader that generate 2 rows in every microbatch. The streamReader instance has a integer offset that increase by 2 in every microbatch.

.. code-block:: python

class RangePartition(InputPartition):
def __init__(self, start, end):
def __init__(self, start: int, end: int):
self.start = start
self.end = end

Expand All @@ -238,14 +242,14 @@ This is a dummy streaming data reader that generate 2 rows in every microbatch.
self.current += 2
return {"offset": self.current}

def partitions(self, start: dict, end: dict):
def partitions(self, start: dict, end: dict) -> list[InputPartition]:
"""
Plans the partitioning of the current microbatch defined by start and end offset,
it needs to return a sequence of :class:`InputPartition` object.
"""
return [RangePartition(start["offset"], end["offset"])]

def commit(self, end: dict):
def commit(self, end: dict) -> None:
"""
This is invoked when the query has finished processing data before end offset, this can be used to clean up resource.
"""
Expand All @@ -259,24 +263,42 @@ This is a dummy streaming data reader that generate 2 rows in every microbatch.
for i in range(start, end):
yield (i, str(i))

**Implement the Simple Stream Reader**
Alternative: Implement a Simple Streaming Reader
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

If the data source has low throughput and doesn't require partitioning, you can implement SimpleDataSourceStreamReader instead of DataSourceStreamReader.

One of simpleStreamReader() and streamReader() must be implemented for readable streaming data source. And simpleStreamReader() will only be invoked when streamReader() is not implemented.
One of simpleStreamReader() and streamReader() must be implemented for a readable streaming data source. And simpleStreamReader() will only be invoked when streamReader() is not implemented.

.. code-block:: python
from pyspark.sql.datasource import SimpleDataSourceStreamReader


class FakeDataSource(DataSource):
...

def simpleStreamReader(self, schema: StructType) -> SimpleDataSourceStreamReader:
return FakeSimpleStreamReader()

# omit implementation of streamReader

...

This is the same dummy streaming reader that generate 2 rows every batch implemented with SimpleDataSourceStreamReader interface.

.. code-block:: python
from typing import Iterator, Tuple

from pyspark.sql.datasource import SimpleDataSourceStreamReader

class SimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
class FakeSimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self) -> dict:
"""
Return the initial start offset of the reader.
"""
return {"offset": 0}

def read(self, start: dict) -> (Iterator[Tuple], dict):
def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
"""
Takes start offset as an input, return an iterator of tuples and the start offset of next read.
"""
Expand All @@ -293,17 +315,23 @@ This is the same dummy streaming reader that generate 2 rows every batch impleme
end_idx = end["offset"]
return iter([(i,) for i in range(start_idx, end_idx)])

def commit(self, end):
def commit(self, end: dict) -> None:
"""
This is invoked when the query has finished processing data before end offset, this can be used to clean up resource.
"""
pass

**Implement the Stream Writer**
Implement a Streaming Writer
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

This is a streaming data writer that write the metadata information of each microbatch to a local path.

.. code-block:: python
from typing import Iterator, List, Optional

from pyspark.sql import Row
from pyspark.sql.datasource import DataSourceStreamWriter, WriterCommitMessage


class SimpleCommitMessage(WriterCommitMessage):
partition_id: int
Expand All @@ -315,7 +343,7 @@ This is a streaming data writer that write the metadata information of each micr
self.path = self.options.get("path")
assert self.path is not None

def write(self, iterator):
def write(self, iterator: Iterator[Row]) -> WriterCommitMessage:
"""
Write the data and return the commit message of that partition
"""
Expand All @@ -327,7 +355,7 @@ This is a streaming data writer that write the metadata information of each micr
cnt += 1
return SimpleCommitMessage(partition_id=partition_id, count=cnt)

def commit(self, messages, batchId) -> None:
def commit(self, messages: List[Optional[WriterCommitMessage]], batchId: int) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` when all write tasks succeed and decides what to do with it.
In this FakeStreamWriter, we write the metadata of the microbatch(number of rows and partitions) into a json file inside commit().
Expand All @@ -336,7 +364,7 @@ This is a streaming data writer that write the metadata information of each micr
with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
file.write(json.dumps(status) + "\n")

def abort(self, messages, batchId) -> None:
def abort(self, messages: List[Optional[WriterCommitMessage]], batchId: int) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some tasks fail and decides what to do with it.
In this FakeStreamWriter, we write a failure message into a txt file inside abort().
Expand All @@ -348,7 +376,7 @@ Serialization Requirement
-------------------------
User defined DataSource, DataSourceReader, DataSourceWriter, DataSourceStreamReader and DataSourceStreamWriter and their methods must be able to be serialized by pickle.

For library that are used inside a method, it must be imported inside the method. For example, TaskContext must be imported inside the read() method in the code below.
For libraries that are used inside a method, they must be imported inside the method. For example, TaskContext must be imported inside the read() method in the code below.

.. code-block:: python

Expand All @@ -358,7 +386,7 @@ For library that are used inside a method, it must be imported inside the method

Using a Python Data Source
--------------------------
**Use a Python Data Source in Batch Query**
**Register a Python Data Source**

After defining your data source, it must be registered before usage.

Expand Down