Skip to content

Commit 9e994dd

Browse files
blink1073bruno-firnkes
authored andcommitted
INTPYTHON-165 Refactor nested data handling (mongodb-labs#245)
1 parent 5890b4c commit 9e994dd

File tree

14 files changed

+815
-969
lines changed

14 files changed

+815
-969
lines changed

.github/workflows/benchmark.yml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ jobs:
5252
tox -e benchmark -- --set-commit-hash $(git rev-parse HEAD)
5353
}
5454
55-
pip install asv
55+
pip install asv virtualenv
5656
asv machine --yes
5757
git fetch origin main:main
5858
git update-ref refs/bm/pr HEAD
@@ -67,7 +67,8 @@ jobs:
6767
6868
- name: Compare benchmarks
6969
run: |
70-
asv compare refs/bm/merge-target refs/bm/pr --
70+
asv compare --factor 1.2 --split refs/bm/merge-target refs/bm/pr --
71+
7172
- name: Fail if any benchmarks have slowed down too much
7273
run: |
73-
! asv compare --factor 1.2 --split refs/bm/merge-target refs/bm/pr | grep -q "got worse"
74+
! asv compare --factor 1.2 --split refs/bm/merge-target refs/bm/pr 2> /dev/null | grep -q "got worse"

.github/workflows/release-python.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ jobs:
123123
export LIBBSON_INSTALL_DIR="$(pwd)/libbson"
124124
python -m pip install dist/*.gz
125125
cd ..
126-
python -c "from pymongoarrow.lib import process_bson_stream"
126+
python -c "from pymongoarrow.lib import libbson_version"
127127
128128
- uses: actions/upload-artifact@v4
129129
with:

.pre-commit-config.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,12 @@ repos:
9494
stages: [manual]
9595
args: ["--no-strict-imports"]
9696

97+
- repo: https://github.com/MarcoGorelli/cython-lint
98+
rev: v0.16.2
99+
hooks:
100+
- id: cython-lint
101+
args: ["--no-pycodestyle"]
102+
97103
- repo: https://github.com/codespell-project/codespell
98104
rev: "v2.2.6"
99105
hooks:

bindings/python/asv.conf.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,6 @@
1010
"N_DOCS": ["20000", "1000"]
1111
}
1212
},
13-
"environment_type": "virtualenv"
13+
"environment_type": "virtualenv",
14+
"plugins": ["virtualenv"]
1415
}

bindings/python/benchmarks/benchmarks.py

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,6 @@ class Read(ABC):
114114
def setup(self):
115115
raise NotImplementedError
116116

117-
# We need this because the naive methods don't always convert nested objects.
118-
@staticmethod # noqa: B027
119-
def exercise_table(table):
120-
pass
121-
122117
def time_conventional_ndarray(self):
123118
collection = db.benchmark
124119
cursor = collection.find(projection={"_id": 0})
@@ -147,13 +142,11 @@ def time_to_pandas(self):
147142
def time_conventional_arrow(self):
148143
c = db.benchmark
149144
f = list(c.find({}, projection={"_id": 0}))
150-
table = pa.Table.from_pylist(f)
151-
self.exercise_table(table)
145+
pa.Table.from_pylist(f)
152146

153147
def time_to_arrow(self):
154148
c = db.benchmark
155-
table = find_arrow_all(c, {}, schema=self.schema, projection={"_id": 0})
156-
self.exercise_table(table)
149+
find_arrow_all(c, {}, schema=self.schema, projection={"_id": 0})
157150

158151
def time_conventional_polars(self):
159152
collection = db.benchmark
@@ -211,27 +204,25 @@ def setup(self):
211204
% (N_DOCS, len(BSON.encode(base_dict)) // 1024, len(base_dict))
212205
)
213206

214-
# We need this because the naive methods don't always convert nested objects.
215-
@staticmethod
216-
def exercise_table(table):
217-
[
218-
[[n for n in i.values] if isinstance(i, pa.ListScalar) else i for i in column]
219-
for column in table.columns
220-
]
221-
222-
# All of the following tests are being skipped because NumPy/Pandas do not work with nested arrays.
207+
# All of the following tests are being skipped because NumPy/Pandas/Polars do not work with nested arrays.
223208
def time_to_numpy(self):
224209
pass
225210

226211
def time_to_pandas(self):
227212
pass
228213

214+
def time_to_polars(self):
215+
pass
216+
229217
def time_conventional_ndarray(self):
230218
pass
231219

232220
def time_conventional_pandas(self):
233221
pass
234222

223+
def time_conventional_polars(self):
224+
pass
225+
235226

236227
class ProfileReadDocument(Read):
237228
schema = Schema(
@@ -260,27 +251,25 @@ def setup(self):
260251
% (N_DOCS, len(BSON.encode(base_dict)) // 1024, len(base_dict))
261252
)
262253

263-
# We need this because the naive methods don't always convert nested objects.
264-
@staticmethod
265-
def exercise_table(table):
266-
[
267-
[[n for n in i.values()] if isinstance(i, pa.StructScalar) else i for i in column]
268-
for column in table.columns
269-
]
270-
271-
# All of the following tests are being skipped because NumPy/Pandas do not work with nested documents.
254+
# All of the following tests are being skipped because NumPy/Pandas/Polars do not work with nested documents.
272255
def time_to_numpy(self):
273256
pass
274257

275258
def time_to_pandas(self):
276259
pass
277260

261+
def time_to_polars(self):
262+
pass
263+
278264
def time_conventional_ndarray(self):
279265
pass
280266

281267
def time_conventional_pandas(self):
282268
pass
283269

270+
def time_conventional_polars(self):
271+
pass
272+
284273

285274
class ProfileReadSmall(Read):
286275
schema = Schema({"x": pa.int64(), "y": pa.float64()})

bindings/python/pymongoarrow/api.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,6 @@
3838
from pymongoarrow.schema import Schema
3939
from pymongoarrow.types import _validate_schema, get_numpy_type
4040

41-
try: # noqa: SIM105
42-
from pymongoarrow.lib import process_bson_stream
43-
except ImportError:
44-
pass
45-
4641
__all__ = [
4742
"aggregate_arrow_all",
4843
"find_arrow_all",
@@ -93,7 +88,7 @@ def find_arrow_all(collection, query, *, schema=None, **kwargs):
9388
:Returns:
9489
An instance of class:`pyarrow.Table`.
9590
"""
96-
context = PyMongoArrowContext.from_schema(schema, codec_options=collection.codec_options)
91+
context = PyMongoArrowContext(schema, codec_options=collection.codec_options)
9792

9893
for opt in ("cursor_type",):
9994
if kwargs.pop(opt, None):
@@ -108,7 +103,7 @@ def find_arrow_all(collection, query, *, schema=None, **kwargs):
108103

109104
raw_batch_cursor = collection.find_raw_batches(query, **kwargs)
110105
for batch in raw_batch_cursor:
111-
process_bson_stream(batch, context)
106+
context.process_bson_stream(batch)
112107

113108
return context.finish()
114109

@@ -131,7 +126,7 @@ def aggregate_arrow_all(collection, pipeline, *, schema=None, **kwargs):
131126
:Returns:
132127
An instance of class:`pyarrow.Table`.
133128
"""
134-
context = PyMongoArrowContext.from_schema(schema, codec_options=collection.codec_options)
129+
context = PyMongoArrowContext(schema, codec_options=collection.codec_options)
135130

136131
if pipeline and ("$out" in pipeline[-1] or "$merge" in pipeline[-1]):
137132
msg = (
@@ -152,7 +147,7 @@ def aggregate_arrow_all(collection, pipeline, *, schema=None, **kwargs):
152147

153148
raw_batch_cursor = collection.aggregate_raw_batches(pipeline, **kwargs)
154149
for batch in raw_batch_cursor:
155-
process_bson_stream(batch, context)
150+
context.process_bson_stream(batch)
156151

157152
return context.finish()
158153

bindings/python/pymongoarrow/context.py

Lines changed: 67 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -11,55 +11,15 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
from bson.codec_options import DEFAULT_CODEC_OPTIONS
15-
from pyarrow import Table, timestamp
14+
from pyarrow import ListArray, StructArray, Table
1615

1716
from pymongoarrow.types import _BsonArrowTypes, _get_internal_typemap
1817

19-
try:
20-
from pymongoarrow.lib import (
21-
BinaryBuilder,
22-
BoolBuilder,
23-
CodeBuilder,
24-
Date32Builder,
25-
Date64Builder,
26-
DatetimeBuilder,
27-
Decimal128Builder,
28-
DocumentBuilder,
29-
DoubleBuilder,
30-
Int32Builder,
31-
Int64Builder,
32-
ListBuilder,
33-
NullBuilder,
34-
ObjectIdBuilder,
35-
StringBuilder,
36-
)
37-
38-
_TYPE_TO_BUILDER_CLS = {
39-
_BsonArrowTypes.int32: Int32Builder,
40-
_BsonArrowTypes.int64: Int64Builder,
41-
_BsonArrowTypes.double: DoubleBuilder,
42-
_BsonArrowTypes.datetime: DatetimeBuilder,
43-
_BsonArrowTypes.objectid: ObjectIdBuilder,
44-
_BsonArrowTypes.decimal128: Decimal128Builder,
45-
_BsonArrowTypes.string: StringBuilder,
46-
_BsonArrowTypes.bool: BoolBuilder,
47-
_BsonArrowTypes.document: DocumentBuilder,
48-
_BsonArrowTypes.array: ListBuilder,
49-
_BsonArrowTypes.binary: BinaryBuilder,
50-
_BsonArrowTypes.code: CodeBuilder,
51-
_BsonArrowTypes.date32: Date32Builder,
52-
_BsonArrowTypes.date64: Date64Builder,
53-
_BsonArrowTypes.null: NullBuilder,
54-
}
55-
except ImportError:
56-
pass
57-
5818

5919
class PyMongoArrowContext:
6020
"""A context for converting BSON-formatted data to an Arrow Table."""
6121

62-
def __init__(self, schema, builder_map, codec_options=None):
22+
def __init__(self, schema, codec_options=None):
6323
"""Initialize the context.
6424
6525
:Parameters:
@@ -68,60 +28,85 @@ def __init__(self, schema, builder_map, codec_options=None):
6828
:class:`~pymongoarrow.builders._BuilderBase` instances.
6929
"""
7030
self.schema = schema
71-
self.builder_map = builder_map
7231
if self.schema is None and codec_options is not None:
7332
self.tzinfo = codec_options.tzinfo
7433
else:
7534
self.tzinfo = None
35+
schema_map = {}
36+
if self.schema is not None:
37+
str_type_map = _get_internal_typemap(schema.typemap)
38+
_parse_types(str_type_map, schema_map, self.tzinfo)
7639

40+
<<<<<<< HEAD
7741
self.raise_on_type_error = schema.raise_on_type_error if schema is not None else False
7842
self.raise_on_type_null = schema.raise_on_type_null if schema is not None else False
7943

8044
@classmethod
8145
def from_schema(cls, schema, codec_options=DEFAULT_CODEC_OPTIONS):
8246
"""Initialize the context from a :class:`~pymongoarrow.schema.Schema`
8347
instance.
48+
=======
49+
# Delayed import to prevent import errors for unbuilt library.
50+
from pymongoarrow.lib import BuilderManager
51+
>>>>>>> 5406fc3 (INTPYTHON-165 Refactor nested data handling (#245))
8452

85-
:Parameters:
86-
- `schema`: Instance of :class:`~pymongoarrow.schema.Schema`.
87-
- `codec_options` (optional): An instance of
88-
:class:`~bson.codec_options.CodecOptions`.
89-
"""
90-
if schema is None:
91-
return cls(schema, {}, codec_options)
92-
93-
builder_map = {}
94-
tzinfo = codec_options.tzinfo
95-
str_type_map = _get_internal_typemap(schema.typemap)
96-
for fname, ftype in str_type_map.items():
97-
builder_cls = _TYPE_TO_BUILDER_CLS[ftype]
98-
encoded_fname = fname.encode("utf-8")
99-
100-
# special-case initializing builders for parameterized types
101-
if builder_cls == DatetimeBuilder:
102-
arrow_type = schema.typemap[fname]
103-
if tzinfo is not None and arrow_type.tz is None:
104-
arrow_type = timestamp(arrow_type.unit, tz=tzinfo)
105-
builder_map[encoded_fname] = DatetimeBuilder(dtype=arrow_type)
106-
elif builder_cls == DocumentBuilder:
107-
arrow_type = schema.typemap[fname]
108-
builder_map[encoded_fname] = DocumentBuilder(arrow_type, tzinfo)
109-
elif builder_cls == ListBuilder:
110-
arrow_type = schema.typemap[fname]
111-
builder_map[encoded_fname] = ListBuilder(arrow_type, tzinfo)
112-
elif builder_cls == BinaryBuilder:
113-
subtype = schema.typemap[fname].subtype
114-
builder_map[encoded_fname] = BinaryBuilder(subtype)
115-
else:
116-
builder_map[encoded_fname] = builder_cls()
117-
return cls(schema, builder_map)
53+
self.manager = BuilderManager(schema_map, self.schema is not None, self.tzinfo)
54+
55+
def process_bson_stream(self, stream):
56+
self.manager.process_bson_stream(stream, len(stream))
11857

11958
def finish(self):
120-
arrays = []
121-
names = []
122-
for fname, builder in self.builder_map.items():
123-
arrays.append(builder.finish())
124-
names.append(fname.decode("utf-8"))
59+
array_map = _parse_builder_map(self.manager.finish())
60+
arrays = list(array_map.values())
12561
if self.schema is not None:
12662
return Table.from_arrays(arrays=arrays, schema=self.schema.to_arrow())
127-
return Table.from_arrays(arrays=arrays, names=names)
63+
return Table.from_arrays(arrays=arrays, names=list(array_map.keys()))
64+
65+
66+
def _parse_builder_map(builder_map):
67+
# Handle nested builders.
68+
to_remove = []
69+
# Traverse the builder map right to left.
70+
for key, value in reversed(builder_map.items()):
71+
if value.type_marker == _BsonArrowTypes.document.value:
72+
names = value.finish()
73+
full_names = [f"{key}.{name}" for name in names]
74+
arrs = [builder_map[c] for c in full_names]
75+
builder_map[key] = StructArray.from_arrays(arrs, names=names)
76+
to_remove.extend(full_names)
77+
elif value.type_marker == _BsonArrowTypes.array.value:
78+
child_name = key + "[]"
79+
to_remove.append(child_name)
80+
child = builder_map[child_name]
81+
builder_map[key] = ListArray.from_arrays(value.finish(), child)
82+
else:
83+
builder_map[key] = value.finish()
84+
85+
for key in to_remove:
86+
if key in builder_map:
87+
del builder_map[key]
88+
89+
return builder_map
90+
91+
92+
def _parse_types(str_type_map, schema_map, tzinfo):
93+
for fname, (ftype, arrow_type) in str_type_map.items():
94+
schema_map[fname] = ftype, arrow_type
95+
96+
# special-case nested builders
97+
if ftype == _BsonArrowTypes.document.value:
98+
# construct a sub type map here
99+
sub_type_map = {}
100+
for i in range(arrow_type.num_fields):
101+
field = arrow_type[i]
102+
sub_name = f"{fname}.{field.name}"
103+
sub_type_map[sub_name] = field.type
104+
sub_type_map = _get_internal_typemap(sub_type_map)
105+
_parse_types(sub_type_map, schema_map, tzinfo)
106+
elif ftype == _BsonArrowTypes.array.value:
107+
sub_type_map = {}
108+
sub_name = f"{fname}[]"
109+
sub_value_type = arrow_type.value_type
110+
sub_type_map[sub_name] = sub_value_type
111+
sub_type_map = _get_internal_typemap(sub_type_map)
112+
_parse_types(sub_type_map, schema_map, tzinfo)

0 commit comments

Comments
 (0)