Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ jobs:
uses: exasol/python-toolbox/.github/actions/python-environment@v1
with:
python-version: ${{ matrix.python-version }}
extras: "all"

- name: Run Tests and Collect Coverage
run: poetry run -- nox -s test:unit -- --coverage
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/slow-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
python-version: ${{ matrix.python-version }}
extras: "all"

- name: "Setup custom host"
- name: Setup Custom Host
run: echo "127.0.0.1 exasol-test-database" | sudo tee -a /etc/hosts

- name: Run Tests and Collect Coverage
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

PyExasol is the officially supported Python connector for [Exasol](https://www.exasol.com). It helps to handle massive volumes of data commonly associated with this DBMS.

You may expect significant performance improvement over ODBC in a single process scenario involving pandas or polars.
You may expect significant performance improvement over ODBC in a single process scenario involving pandas, parquet, or polars.

PyExasol provides an [API](https://exasol.github.io/pyexasol/master/api.html) to read & write multiple data streams in parallel using separate processes, which is necessary to fully utilize hardware and achieve linear scalability. With PyExasol you are no longer limited to a single CPU core.

Expand All @@ -32,7 +32,7 @@ PyExasol provides an [API](https://exasol.github.io/pyexasol/master/api.html) to

- Based on [WebSocket protocol](https://github.com/exasol/websocket-api);
- Optimized for minimum overhead;
- Easy integration with pandas and polars via HTTP transport;
- Easy integration with pandas, parquet, and polars via HTTP transport;
- Compression to reduce network bottleneck;


Expand Down
3 changes: 3 additions & 0 deletions doc/changes/unreleased.md
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
# Unreleased

## ✨Features
* #208: Added `ExaConnection.import_from_parquet` which can read parquet file(s)
23 changes: 23 additions & 0 deletions doc/user_guide/exploring_features/import_and_export/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,29 @@ See :meth:`pyexasol.ExaConnection.import_from_pandas`.

C.import_from_pandas(pd, "users")

.. _parquet_export_import:

Parquet
^^^^^^^

Import
""""""
See :meth:`pyexasol.ExaConnection.import_from_parquet`.

.. code-block:: python

from pathlib import Path

# list[Path]: list of specific parquet files to load
C.import_from_parquet([Path("local_path/test.parquet"], "users")

# Path: can be either a file or directory. If it's a directory,
# all files matching this pattern *.parquet will be processed.
C.import_from_parquet(Path("local_path/test.parquet", "users")

# string: representing a filepath which already contains a glob pattern
C.import_from_parquet("local_path/*.parquet", "users")

.. _polars_export_import:

Polars
Expand Down
7 changes: 4 additions & 3 deletions doc/user_guide/getting_started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ massive volumes of data with efficiency, providing a performance boost over trad
Why Choose PyExasol?
--------------------
* Easy and fast access to Exasol from Python
* Bulk import and export from/to :ref:`pandas <pandas_export_import>` and :ref:`polars <polars_export_import>` to Exasol
* Bulk import and export from/to :ref:`pandas <pandas_export_import>`, parquet :ref:parquet <parquet_export_import>` , and :ref:`polars <polars_export_import>` to Exasol
* Exasol UDF debugging support

Prerequisites
Expand All @@ -26,8 +26,9 @@ Optional Dependencies
^^^^^^^^^^^^^^^^^^^^^

- ``orjson`` is required for ``json_lib=orjson`` to improve JSON parsing performance
- ``pandas`` is required for :ref:`http_transport` functions working with :class:`pandas.DataFrame`
- ``polars`` is required for :ref:`http_transport` functions working with :class:`polars.DataFrame`
- ``pandas`` is required for :ref:`importing_and_exporting_data` functions working with :class:`pandas.DataFrame`
- ``polars`` is required for :ref:`importing_and_exporting_data` functions working with :class:`polars.DataFrame`
- ``pyarrow`` is required for :ref:`importing_and_exporting_data` functions working with :class:`pyarrow.parquet`
- ``pproxy`` is used in the :ref:`examples` to test an HTTP proxy
- ``rapidjson`` is required for ``json_lib=rapidjson`` to improve JSON parsing performance
- ``ujson`` is required for ``json_lib=ujson`` to improve JSON parsing performance
Expand Down
81 changes: 75 additions & 6 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

108 changes: 89 additions & 19 deletions pyexasol/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
"""

import csv
import glob
import io
import shutil
from collections.abc import Iterable
from pathlib import Path
from typing import (
TYPE_CHECKING,
Union,
Expand Down Expand Up @@ -93,33 +95,101 @@ def import_from_pandas(pipe, src: "pandas.DataFrame", **kwargs):
Basic example how to import from Pandas DataFrame
Custom params for "to_csv" may be passed in **kwargs
"""
import packaging.version
import pandas

if not isinstance(src, pandas.DataFrame):
raise ValueError("Data source is not pandas.DataFrame")

wrapped_pipe = io.TextIOWrapper(pipe, newline="\n", encoding="utf-8")
src.to_csv(
wrapped_pipe,
header=False,
index=False,
lineterminator="\n",
quoting=csv.QUOTE_NONNUMERIC,
**kwargs,
)

# https://github.com/pandas-dev/pandas/pull/45302
if packaging.version.parse(pandas.__version__) >= packaging.version.parse("1.5.0"):
return src.to_csv(
wrapped_pipe,
header=False,
index=False,
lineterminator="\n",
quoting=csv.QUOTE_NONNUMERIC,
**kwargs
)
else:
return src.to_csv(
wrapped_pipe,
header=False,
index=False,
line_terminator="\n",
quoting=csv.QUOTE_NONNUMERIC,
**kwargs

def get_parquet_files(source: Union[list[Path], Path, str]) -> list[Path]:
if isinstance(source, str):
matches = glob.glob(source)
return sorted(
[filepath for i in matches if (filepath := Path(i)) and filepath.is_file()]
)
if isinstance(source, Path):
if source.is_file():
return [source]
elif source.is_dir():
return sorted(source.glob("*.parquet"))
if isinstance(source, list):
not_a_valid_path_file = [
filepath
for filepath in source
if not isinstance(filepath, Path) or not filepath.is_file()
]
if len(not_a_valid_path_file) > 0:
raise ValueError(
f"source {source} contained entries which were not `Path` or valid `Path` files {not_a_valid_path_file}"
)
return source
raise ValueError(
f"source {source} is not a supported type (Union[list[Path], Path, str])."
)


def import_from_parquet(
pipe, source: Union[list[Path], Path, str], **kwargs
): # NOSONAR(S3776)
"""
Basic example how to import from pyarrow parquet file(s)

Args:
source: Local filepath specification(s) to process. Can be one of:
- list[pathlib.Path]: list of specific files
- pathlib.Path: can be either a file or directory. If it is a directory,
all files matching the following pattern *.parquet will be processed.
- str: representing a filepath which already contains a glob pattern
(e.g., "/local_dir/*.parquet")
**kwargs:
Custom params for :func:`pyarrow.parquet.Table.iter_batches`. This can be used
to specify what columns should be read and their preferred order.

Please note that nested or hierarchical column types are not supported.
"""
from pyarrow import (
csv,
parquet,
types,
)

def ensure_no_nested_columns(
schema, requested_columns: Union[list[str], None]
) -> None:
nested_fields = []
for field in schema:
if not types.is_nested(field.type):
continue
if requested_columns and field.name in requested_columns:
nested_fields.append(field)
if not requested_columns:
nested_fields.append(field)

if nested_fields:
raise ValueError(
f"Fields {nested_fields} of schema from file {file} is hierarchical which is not supported."
)

if not (parquet_files := get_parquet_files(source)):
raise ValueError(f"source {source} does not match any files")

columns = kwargs.get("columns", None)
for file in parquet_files:
parquet_file = parquet.ParquetFile(file, memory_map=True)
ensure_no_nested_columns(parquet_file.schema_arrow, columns)
for batch in parquet_file.iter_batches(**kwargs):
write_options = csv.WriteOptions(include_header=False)
csv.write_csv(batch, pipe, write_options=write_options)


def import_from_polars(
Expand Down
Loading