Skip to content

Commit 2c2b6f6

Browse files
feat: Add support for deltalake (#134)
1 parent 4fd1b4d commit 2c2b6f6

20 files changed

+3071
-1643
lines changed

dataframely/_compat.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,16 @@ def __getattr__(self, name: str) -> Any:
1313
raise ValueError(f"Module '{self.module}' is not installed.")
1414

1515

16+
# ------------------------------------ DELTALAKE ------------------------------------- #
17+
18+
try:
19+
import deltalake
20+
from deltalake import DeltaTable
21+
except ImportError: # pragma: no cover
22+
deltalake = _DummyModule("deltalake") # type: ignore
23+
24+
class DeltaTable: # type: ignore # noqa: N801
25+
pass
1626
# ------------------------------------ SQLALCHEMY ------------------------------------ #
1727

1828
try:
@@ -47,6 +57,7 @@ class Dialect: # type: ignore # noqa: N801
4757
# ------------------------------------------------------------------------------------ #
4858

4959
__all__ = [
60+
"deltalake",
5061
"sa",
5162
"sa_mssql",
5263
"sa_TypeEngine",

dataframely/_storage/_base.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,8 +191,14 @@ def scan_failure_info(
191191
) -> tuple[pl.LazyFrame, SerializedRules, SerializedSchema]:
192192
"""Lazily read the failure info from the storage backend."""
193193

194-
@abstractmethod
195194
def read_failure_info(
196195
self, **kwargs: Any
197196
) -> tuple[pl.DataFrame, SerializedRules, SerializedSchema]:
198197
"""Read the failure info from the storage backend."""
198+
199+
lf, rule_metadata, schema_metadata = self.scan_failure_info(**kwargs)
200+
return (
201+
lf.collect(),
202+
rule_metadata,
203+
schema_metadata,
204+
)

dataframely/_storage/_exc.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Copyright (c) QuantCo 2025-2025
2+
# SPDX-License-Identifier: BSD-3-Clause
3+
4+
5+
def assert_failure_info_metadata(metadata: str | None) -> str:
6+
if metadata:
7+
return metadata
8+
raise ValueError(
9+
"The required FailureInfo metadata was not found in the storage backend."
10+
)

dataframely/_storage/constants.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# Copyright (c) QuantCo 2025-2025
2+
# SPDX-License-Identifier: BSD-3-Clause
3+
4+
SCHEMA_METADATA_KEY = "dataframely_schema"
5+
COLLECTION_METADATA_KEY = "dataframely_collection"
6+
RULE_METADATA_KEY = "dataframely_rule_columns"

dataframely/_storage/delta.py

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
# Copyright (c) QuantCo 2025-2025
2+
# SPDX-License-Identifier: BSD-3-Clause
3+
from __future__ import annotations
4+
5+
from collections.abc import Iterable
6+
from pathlib import Path
7+
from typing import Any
8+
9+
import polars as pl
10+
11+
from dataframely._compat import deltalake
12+
13+
from ._base import (
14+
SerializedCollection,
15+
SerializedRules,
16+
SerializedSchema,
17+
StorageBackend,
18+
)
19+
from ._exc import assert_failure_info_metadata
20+
from .constants import COLLECTION_METADATA_KEY, RULE_METADATA_KEY, SCHEMA_METADATA_KEY
21+
22+
23+
class DeltaStorageBackend(StorageBackend):
24+
def sink_frame(
25+
self, lf: pl.LazyFrame, serialized_schema: SerializedSchema, **kwargs: Any
26+
) -> None:
27+
_raise_on_lazy_write()
28+
29+
def write_frame(
30+
self, df: pl.DataFrame, serialized_schema: SerializedSchema, **kwargs: Any
31+
) -> None:
32+
target = kwargs.pop("target")
33+
metadata = kwargs.pop("metadata", {})
34+
delta_write_options = kwargs.pop("delta_write_options", {})
35+
36+
# Delta lake does not allow partitioning if there is only one column
37+
# We dynamically remove this setting here to allow users to still specify it
38+
# on the collection level without having to worry about each individual member
39+
if len(df.columns) < 2:
40+
delta_write_options.pop("partition_by", None)
41+
42+
df.write_delta(
43+
target,
44+
delta_write_options=(
45+
delta_write_options
46+
| {
47+
"commit_properties": deltalake.CommitProperties(
48+
custom_metadata=metadata
49+
| {SCHEMA_METADATA_KEY: serialized_schema}
50+
),
51+
}
52+
),
53+
**kwargs,
54+
)
55+
56+
def scan_frame(self, **kwargs: Any) -> tuple[pl.LazyFrame, SerializedSchema | None]:
57+
table = _to_delta_table(kwargs.pop("source"))
58+
serialized_schema = _read_serialized_schema(table)
59+
df = pl.scan_delta(table, **kwargs)
60+
return df, serialized_schema
61+
62+
def read_frame(self, **kwargs: Any) -> tuple[pl.DataFrame, SerializedSchema | None]:
63+
table = _to_delta_table(kwargs.pop("source"))
64+
serialized_schema = _read_serialized_schema(table)
65+
df = pl.read_delta(table, **kwargs)
66+
return df, serialized_schema
67+
68+
# ------------------------------ Collections ---------------------------------------
69+
def sink_collection(
70+
self,
71+
dfs: dict[str, pl.LazyFrame],
72+
serialized_collection: SerializedCollection,
73+
serialized_schemas: dict[str, str],
74+
**kwargs: Any,
75+
) -> None:
76+
_raise_on_lazy_write()
77+
78+
def write_collection(
79+
self,
80+
dfs: dict[str, pl.LazyFrame],
81+
serialized_collection: SerializedCollection,
82+
serialized_schemas: dict[str, str],
83+
**kwargs: Any,
84+
) -> None:
85+
uri = Path(kwargs.pop("target"))
86+
87+
# The collection schema is serialized as part of the member parquet metadata
88+
kwargs["metadata"] = kwargs.get("metadata", {}) | {
89+
COLLECTION_METADATA_KEY: serialized_collection
90+
}
91+
92+
for key, lf in dfs.items():
93+
self.write_frame(
94+
lf.collect(),
95+
serialized_schema=serialized_schemas[key],
96+
target=uri / key,
97+
**kwargs,
98+
)
99+
100+
def scan_collection(
101+
self, members: Iterable[str], **kwargs: Any
102+
) -> tuple[dict[str, pl.LazyFrame], list[SerializedCollection | None]]:
103+
uri = Path(kwargs.pop("source"))
104+
105+
data = {}
106+
collection_types = []
107+
for key in members:
108+
member_uri = uri / key
109+
if not deltalake.DeltaTable.is_deltatable(str(member_uri)):
110+
continue
111+
table = _to_delta_table(member_uri)
112+
data[key] = pl.scan_delta(table, **kwargs)
113+
collection_types.append(_read_serialized_collection(table))
114+
115+
return data, collection_types
116+
117+
def read_collection(
118+
self, members: Iterable[str], **kwargs: Any
119+
) -> tuple[dict[str, pl.LazyFrame], list[SerializedCollection | None]]:
120+
lazy, collection_types = self.scan_collection(members, **kwargs)
121+
eager = {name: lf.collect().lazy() for name, lf in lazy.items()}
122+
return eager, collection_types
123+
124+
# ------------------------------ Failure Info --------------------------------------
125+
def sink_failure_info(
126+
self,
127+
lf: pl.LazyFrame,
128+
serialized_rules: SerializedRules,
129+
serialized_schema: SerializedSchema,
130+
**kwargs: Any,
131+
) -> None:
132+
_raise_on_lazy_write()
133+
134+
def write_failure_info(
135+
self,
136+
df: pl.DataFrame,
137+
serialized_rules: SerializedRules,
138+
serialized_schema: SerializedSchema,
139+
**kwargs: Any,
140+
) -> None:
141+
self.write_frame(
142+
df,
143+
serialized_schema,
144+
metadata={
145+
RULE_METADATA_KEY: serialized_rules,
146+
},
147+
**kwargs,
148+
)
149+
150+
def scan_failure_info(
151+
self, **kwargs: Any
152+
) -> tuple[pl.LazyFrame, SerializedRules, SerializedSchema]:
153+
"""Lazily read the failure info from the storage backend."""
154+
table = _to_delta_table(kwargs.pop("source"))
155+
156+
# Metadata
157+
serialized_rules = assert_failure_info_metadata(_read_serialized_rules(table))
158+
serialized_schema = assert_failure_info_metadata(_read_serialized_schema(table))
159+
160+
# Data
161+
lf = pl.scan_delta(table, **kwargs)
162+
163+
return lf, serialized_rules, serialized_schema
164+
165+
166+
def _raise_on_lazy_write() -> None:
167+
raise NotImplementedError("Lazy writes are not currently supported for deltalake.")
168+
169+
170+
def _read_serialized_schema(table: deltalake.DeltaTable) -> SerializedSchema | None:
171+
[last_commit] = table.history(limit=1)
172+
return last_commit.get(SCHEMA_METADATA_KEY, None)
173+
174+
175+
def _read_serialized_collection(
176+
table: deltalake.DeltaTable,
177+
) -> SerializedCollection | None:
178+
[last_commit] = table.history(limit=1)
179+
return last_commit.get(COLLECTION_METADATA_KEY, None)
180+
181+
182+
def _read_serialized_rules(
183+
table: deltalake.DeltaTable,
184+
) -> SerializedRules | None:
185+
[last_commit] = table.history(limit=1)
186+
return last_commit.get(RULE_METADATA_KEY, None)
187+
188+
189+
def _to_delta_table(
190+
table: Path | str | deltalake.DeltaTable,
191+
) -> deltalake.DeltaTable:
192+
from deltalake import DeltaTable
193+
194+
match table:
195+
case DeltaTable():
196+
return table
197+
case str() | Path():
198+
return DeltaTable(table)
199+
case _:
200+
raise TypeError(f"Unsupported type {table!r}")

dataframely/_storage/parquet.py

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,8 @@
1313
SerializedSchema,
1414
StorageBackend,
1515
)
16-
17-
SCHEMA_METADATA_KEY = "dataframely_schema"
18-
COLLECTION_METADATA_KEY = "dataframely_collection"
19-
RULE_METADATA_KEY = "dataframely_rule_columns"
16+
from ._exc import assert_failure_info_metadata
17+
from .constants import COLLECTION_METADATA_KEY, RULE_METADATA_KEY, SCHEMA_METADATA_KEY
2018

2119

2220
class ParquetStorageBackend(StorageBackend):
@@ -218,24 +216,17 @@ def scan_failure_info(
218216
self, **kwargs: Any
219217
) -> tuple[pl.LazyFrame, SerializedRules, SerializedSchema]:
220218
file = kwargs.pop("file")
219+
220+
# Meta data
221221
metadata = pl.read_parquet_metadata(file)
222-
schema_metadata = metadata.get(SCHEMA_METADATA_KEY)
222+
serialized_schema = assert_failure_info_metadata(
223+
metadata.get(SCHEMA_METADATA_KEY)
224+
)
225+
serialized_rules = assert_failure_info_metadata(metadata.get(RULE_METADATA_KEY))
223226

224-
rule_metadata = metadata.get(RULE_METADATA_KEY)
225-
if schema_metadata is None or rule_metadata is None:
226-
raise ValueError("The parquet file does not contain the required metadata.")
227+
# Data
227228
lf = pl.scan_parquet(file, **kwargs)
228-
return lf, rule_metadata, schema_metadata
229-
230-
def read_failure_info(
231-
self, **kwargs: Any
232-
) -> tuple[pl.DataFrame, SerializedRules, SerializedSchema]:
233-
lf, rule_metadata, schema_metadata = self.scan_failure_info(**kwargs)
234-
return (
235-
lf.collect(),
236-
rule_metadata,
237-
schema_metadata,
238-
)
229+
return lf, serialized_rules, serialized_schema
239230

240231

241232
def _read_serialized_collection(path: Path) -> SerializedCollection | None:

0 commit comments

Comments
 (0)