diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 52eb260c..1b99d402 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -51,7 +51,7 @@ jobs: tox -e benchmark -- --set-commit-hash $(git rev-parse HEAD) } - pip install asv + pip install asv virtualenv asv machine --yes git fetch origin main:main git update-ref refs/bm/pr HEAD @@ -66,7 +66,8 @@ jobs: - name: Compare benchmarks run: | - asv compare refs/bm/merge-target refs/bm/pr -- + asv compare --factor 1.2 --split refs/bm/merge-target refs/bm/pr -- + - name: Fail if any benchmarks have slowed down too much run: | - ! asv compare --factor 1.2 --split refs/bm/merge-target refs/bm/pr | grep -q "got worse" + ! asv compare --factor 1.2 --split refs/bm/merge-target refs/bm/pr 2> /dev/null | grep -q "got worse" diff --git a/.github/workflows/release-python.yml b/.github/workflows/release-python.yml index b401f75c..d3e9784e 100644 --- a/.github/workflows/release-python.yml +++ b/.github/workflows/release-python.yml @@ -123,7 +123,7 @@ jobs: export LIBBSON_INSTALL_DIR="$(pwd)/libbson" python -m pip install dist/*.gz cd .. - python -c "from pymongoarrow.lib import process_bson_stream" + python -c "from pymongoarrow.lib import libbson_version" - uses: actions/upload-artifact@v4 with: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d36b6732..97e476ee 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -94,6 +94,12 @@ repos: stages: [manual] args: ["--no-strict-imports"] +- repo: https://github.com/MarcoGorelli/cython-lint + rev: v0.16.2 + hooks: + - id: cython-lint + args: ["--no-pycodestyle"] + - repo: https://github.com/codespell-project/codespell rev: "v2.2.6" hooks: diff --git a/bindings/python/asv.conf.json b/bindings/python/asv.conf.json index eee9723f..f561cc55 100644 --- a/bindings/python/asv.conf.json +++ b/bindings/python/asv.conf.json @@ -10,5 +10,6 @@ "N_DOCS": ["20000", "1000"] } }, - "environment_type": "virtualenv" + "environment_type": "virtualenv", + "plugins": ["virtualenv"] } diff --git a/bindings/python/benchmarks/benchmarks.py b/bindings/python/benchmarks/benchmarks.py index 892c78ae..843b0e97 100644 --- a/bindings/python/benchmarks/benchmarks.py +++ b/bindings/python/benchmarks/benchmarks.py @@ -114,11 +114,6 @@ class Read(ABC): def setup(self): raise NotImplementedError - # We need this because the naive methods don't always convert nested objects. - @staticmethod # noqa: B027 - def exercise_table(table): - pass - def time_conventional_ndarray(self): collection = db.benchmark cursor = collection.find(projection={"_id": 0}) @@ -147,13 +142,11 @@ def time_to_pandas(self): def time_conventional_arrow(self): c = db.benchmark f = list(c.find({}, projection={"_id": 0})) - table = pa.Table.from_pylist(f) - self.exercise_table(table) + pa.Table.from_pylist(f) def time_to_arrow(self): c = db.benchmark - table = find_arrow_all(c, {}, schema=self.schema, projection={"_id": 0}) - self.exercise_table(table) + find_arrow_all(c, {}, schema=self.schema, projection={"_id": 0}) def time_conventional_polars(self): collection = db.benchmark @@ -211,27 +204,25 @@ def setup(self): % (N_DOCS, len(BSON.encode(base_dict)) // 1024, len(base_dict)) ) - # We need this because the naive methods don't always convert nested objects. - @staticmethod - def exercise_table(table): - [ - [[n for n in i.values] if isinstance(i, pa.ListScalar) else i for i in column] - for column in table.columns - ] - - # All of the following tests are being skipped because NumPy/Pandas do not work with nested arrays. + # All of the following tests are being skipped because NumPy/Pandas/Polars do not work with nested arrays. def time_to_numpy(self): pass def time_to_pandas(self): pass + def time_to_polars(self): + pass + def time_conventional_ndarray(self): pass def time_conventional_pandas(self): pass + def time_conventional_polars(self): + pass + class ProfileReadDocument(Read): schema = Schema( @@ -260,27 +251,25 @@ def setup(self): % (N_DOCS, len(BSON.encode(base_dict)) // 1024, len(base_dict)) ) - # We need this because the naive methods don't always convert nested objects. - @staticmethod - def exercise_table(table): - [ - [[n for n in i.values()] if isinstance(i, pa.StructScalar) else i for i in column] - for column in table.columns - ] - - # All of the following tests are being skipped because NumPy/Pandas do not work with nested documents. + # All of the following tests are being skipped because NumPy/Pandas/Polars do not work with nested documents. def time_to_numpy(self): pass def time_to_pandas(self): pass + def time_to_polars(self): + pass + def time_conventional_ndarray(self): pass def time_conventional_pandas(self): pass + def time_conventional_polars(self): + pass + class ProfileReadSmall(Read): schema = Schema({"x": pa.int64(), "y": pa.float64()}) diff --git a/bindings/python/pymongoarrow/api.py b/bindings/python/pymongoarrow/api.py index f295648b..829b35f8 100644 --- a/bindings/python/pymongoarrow/api.py +++ b/bindings/python/pymongoarrow/api.py @@ -38,11 +38,6 @@ from pymongoarrow.schema import Schema from pymongoarrow.types import _validate_schema, get_numpy_type -try: # noqa: SIM105 - from pymongoarrow.lib import process_bson_stream -except ImportError: - pass - __all__ = [ "aggregate_arrow_all", "find_arrow_all", @@ -93,7 +88,7 @@ def find_arrow_all(collection, query, *, schema=None, **kwargs): :Returns: An instance of class:`pyarrow.Table`. """ - context = PyMongoArrowContext.from_schema(schema, codec_options=collection.codec_options) + context = PyMongoArrowContext(schema, codec_options=collection.codec_options) for opt in ("cursor_type",): if kwargs.pop(opt, None): @@ -108,7 +103,7 @@ def find_arrow_all(collection, query, *, schema=None, **kwargs): raw_batch_cursor = collection.find_raw_batches(query, **kwargs) for batch in raw_batch_cursor: - process_bson_stream(batch, context) + context.process_bson_stream(batch) return context.finish() @@ -131,7 +126,7 @@ def aggregate_arrow_all(collection, pipeline, *, schema=None, **kwargs): :Returns: An instance of class:`pyarrow.Table`. """ - context = PyMongoArrowContext.from_schema(schema, codec_options=collection.codec_options) + context = PyMongoArrowContext(schema, codec_options=collection.codec_options) if pipeline and ("$out" in pipeline[-1] or "$merge" in pipeline[-1]): msg = ( @@ -152,7 +147,7 @@ def aggregate_arrow_all(collection, pipeline, *, schema=None, **kwargs): raw_batch_cursor = collection.aggregate_raw_batches(pipeline, **kwargs) for batch in raw_batch_cursor: - process_bson_stream(batch, context) + context.process_bson_stream(batch) return context.finish() diff --git a/bindings/python/pymongoarrow/context.py b/bindings/python/pymongoarrow/context.py index 6b4208a0..e6da7abf 100644 --- a/bindings/python/pymongoarrow/context.py +++ b/bindings/python/pymongoarrow/context.py @@ -11,55 +11,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from bson.codec_options import DEFAULT_CODEC_OPTIONS -from pyarrow import Table, timestamp +from pyarrow import ListArray, StructArray, Table from pymongoarrow.types import _BsonArrowTypes, _get_internal_typemap -try: - from pymongoarrow.lib import ( - BinaryBuilder, - BoolBuilder, - CodeBuilder, - Date32Builder, - Date64Builder, - DatetimeBuilder, - Decimal128Builder, - DocumentBuilder, - DoubleBuilder, - Int32Builder, - Int64Builder, - ListBuilder, - NullBuilder, - ObjectIdBuilder, - StringBuilder, - ) - - _TYPE_TO_BUILDER_CLS = { - _BsonArrowTypes.int32: Int32Builder, - _BsonArrowTypes.int64: Int64Builder, - _BsonArrowTypes.double: DoubleBuilder, - _BsonArrowTypes.datetime: DatetimeBuilder, - _BsonArrowTypes.objectid: ObjectIdBuilder, - _BsonArrowTypes.decimal128: Decimal128Builder, - _BsonArrowTypes.string: StringBuilder, - _BsonArrowTypes.bool: BoolBuilder, - _BsonArrowTypes.document: DocumentBuilder, - _BsonArrowTypes.array: ListBuilder, - _BsonArrowTypes.binary: BinaryBuilder, - _BsonArrowTypes.code: CodeBuilder, - _BsonArrowTypes.date32: Date32Builder, - _BsonArrowTypes.date64: Date64Builder, - _BsonArrowTypes.null: NullBuilder, - } -except ImportError: - pass - class PyMongoArrowContext: """A context for converting BSON-formatted data to an Arrow Table.""" - def __init__(self, schema, builder_map, codec_options=None): + def __init__(self, schema, codec_options=None): """Initialize the context. :Parameters: @@ -68,57 +28,75 @@ def __init__(self, schema, builder_map, codec_options=None): :class:`~pymongoarrow.builders._BuilderBase` instances. """ self.schema = schema - self.builder_map = builder_map if self.schema is None and codec_options is not None: self.tzinfo = codec_options.tzinfo else: self.tzinfo = None + schema_map = {} + if self.schema is not None: + str_type_map = _get_internal_typemap(schema.typemap) + _parse_types(str_type_map, schema_map, self.tzinfo) - @classmethod - def from_schema(cls, schema, codec_options=DEFAULT_CODEC_OPTIONS): - """Initialize the context from a :class:`~pymongoarrow.schema.Schema` - instance. + # Delayed import to prevent import errors for unbuilt library. + from pymongoarrow.lib import BuilderManager - :Parameters: - - `schema`: Instance of :class:`~pymongoarrow.schema.Schema`. - - `codec_options` (optional): An instance of - :class:`~bson.codec_options.CodecOptions`. - """ - if schema is None: - return cls(schema, {}, codec_options) - - builder_map = {} - tzinfo = codec_options.tzinfo - str_type_map = _get_internal_typemap(schema.typemap) - for fname, ftype in str_type_map.items(): - builder_cls = _TYPE_TO_BUILDER_CLS[ftype] - encoded_fname = fname.encode("utf-8") - - # special-case initializing builders for parameterized types - if builder_cls == DatetimeBuilder: - arrow_type = schema.typemap[fname] - if tzinfo is not None and arrow_type.tz is None: - arrow_type = timestamp(arrow_type.unit, tz=tzinfo) - builder_map[encoded_fname] = DatetimeBuilder(dtype=arrow_type) - elif builder_cls == DocumentBuilder: - arrow_type = schema.typemap[fname] - builder_map[encoded_fname] = DocumentBuilder(arrow_type, tzinfo) - elif builder_cls == ListBuilder: - arrow_type = schema.typemap[fname] - builder_map[encoded_fname] = ListBuilder(arrow_type, tzinfo) - elif builder_cls == BinaryBuilder: - subtype = schema.typemap[fname].subtype - builder_map[encoded_fname] = BinaryBuilder(subtype) - else: - builder_map[encoded_fname] = builder_cls() - return cls(schema, builder_map) + self.manager = BuilderManager(schema_map, self.schema is not None, self.tzinfo) + + def process_bson_stream(self, stream): + self.manager.process_bson_stream(stream, len(stream)) def finish(self): - arrays = [] - names = [] - for fname, builder in self.builder_map.items(): - arrays.append(builder.finish()) - names.append(fname.decode("utf-8")) + array_map = _parse_builder_map(self.manager.finish()) + arrays = list(array_map.values()) if self.schema is not None: return Table.from_arrays(arrays=arrays, schema=self.schema.to_arrow()) - return Table.from_arrays(arrays=arrays, names=names) + return Table.from_arrays(arrays=arrays, names=list(array_map.keys())) + + +def _parse_builder_map(builder_map): + # Handle nested builders. + to_remove = [] + # Traverse the builder map right to left. + for key, value in reversed(builder_map.items()): + if value.type_marker == _BsonArrowTypes.document.value: + names = value.finish() + full_names = [f"{key}.{name}" for name in names] + arrs = [builder_map[c] for c in full_names] + builder_map[key] = StructArray.from_arrays(arrs, names=names) + to_remove.extend(full_names) + elif value.type_marker == _BsonArrowTypes.array.value: + child_name = key + "[]" + to_remove.append(child_name) + child = builder_map[child_name] + builder_map[key] = ListArray.from_arrays(value.finish(), child) + else: + builder_map[key] = value.finish() + + for key in to_remove: + if key in builder_map: + del builder_map[key] + + return builder_map + + +def _parse_types(str_type_map, schema_map, tzinfo): + for fname, (ftype, arrow_type) in str_type_map.items(): + schema_map[fname] = ftype, arrow_type + + # special-case nested builders + if ftype == _BsonArrowTypes.document.value: + # construct a sub type map here + sub_type_map = {} + for i in range(arrow_type.num_fields): + field = arrow_type[i] + sub_name = f"{fname}.{field.name}" + sub_type_map[sub_name] = field.type + sub_type_map = _get_internal_typemap(sub_type_map) + _parse_types(sub_type_map, schema_map, tzinfo) + elif ftype == _BsonArrowTypes.array.value: + sub_type_map = {} + sub_name = f"{fname}[]" + sub_value_type = arrow_type.value_type + sub_type_map[sub_name] = sub_value_type + sub_type_map = _get_internal_typemap(sub_type_map) + _parse_types(sub_type_map, schema_map, tzinfo) diff --git a/bindings/python/pymongoarrow/lib.pyx b/bindings/python/pymongoarrow/lib.pyx index 23189ed5..802ee829 100644 --- a/bindings/python/pymongoarrow/lib.pyx +++ b/bindings/python/pymongoarrow/lib.pyx @@ -17,479 +17,329 @@ # cython: language_level=3 # Stdlib imports -import copy -import datetime -import enum import sys -from math import isnan # Python imports import bson import numpy as np -from pyarrow import timestamp, struct, field -from pyarrow.lib import ( - tobytes, StructType, int32, int64, float64, string, bool_, list_ -) +from pyarrow import timestamp, default_memory_pool -from pymongoarrow.errors import InvalidBSON, PyMongoArrowError -from pymongoarrow.context import PyMongoArrowContext -from pymongoarrow.types import _BsonArrowTypes, _atypes, ObjectIdType, Decimal128Type as Decimal128Type_, BinaryType, CodeType +from pymongoarrow.errors import InvalidBSON +from pymongoarrow.types import ObjectIdType, Decimal128Type as Decimal128Type_, BinaryType, CodeType # Cython imports -from cpython cimport PyBytes_Size, object -from cython.operator cimport dereference, preincrement +from cpython cimport object from libcpp cimport bool as cbool -from libcpp.map cimport map +from libc.math cimport isnan from libcpp.string cimport string as cstring -from libc.string cimport strlen, memcpy -from libcpp.vector cimport vector +from libc.string cimport memcpy +from libcpp cimport nullptr from pyarrow.lib cimport * from pymongoarrow.libarrow cimport * from pymongoarrow.libbson cimport * +# Placeholder numbers for the date types. +# Keep in sync with _BsonArrowTypes in types.py. +cdef uint8_t ARROW_TYPE_DATE32 = 100 +cdef uint8_t ARROW_TYPE_DATE64 = 101 +cdef uint8_t ARROW_TYPE_NULL = 102 # libbson version libbson_version = bson_get_version().decode('utf-8') -# BSON tools cdef const bson_t* bson_reader_read_safe(bson_reader_t* stream_reader) except? NULL: cdef cbool reached_eof = False cdef const bson_t* doc = bson_reader_read(stream_reader, &reached_eof) - if doc == NULL and reached_eof is False: + if doc == NULL and not reached_eof: raise InvalidBSON("Could not read BSON document stream") return doc -# Placeholder numbers for the date types. -cdef uint8_t ARROW_TYPE_DATE32 = 100 -cdef uint8_t ARROW_TYPE_DATE64 = 101 -cdef uint8_t ARROW_TYPE_NULL = 102 +cdef class BuilderManager: + cdef: + dict builder_map + uint64_t count + bint has_schema + object tzinfo + object pool + + def __cinit__(self, dict schema_map, bint has_schema, object tzinfo): + self.has_schema = has_schema + self.tzinfo = tzinfo + self.count = 0 + self.builder_map = {} + self.pool = default_memory_pool() + # Unpack the schema map. + for fname, (ftype, arrow_type) in schema_map.items(): + name = fname.encode('utf-8') + # special-case initializing builders for parameterized types + if ftype == BSON_TYPE_DATE_TIME: + if tzinfo is not None and arrow_type.tz is None: + arrow_type = timestamp(arrow_type.unit, tz=tzinfo) # noqa: PLW2901 + self.builder_map[name] = DatetimeBuilder(dtype=arrow_type, memory_pool=self.pool) + elif ftype == BSON_TYPE_BINARY: + self.builder_map[name] = BinaryBuilder(arrow_type.subtype, memory_pool=self.pool) + else: + # We only use the doc_iter for binary arrays, which are handled already. + self.get_builder(name, ftype, nullptr) + + cdef _ArrayBuilderBase get_builder(self, cstring key, bson_type_t value_t, bson_iter_t * doc_iter) except *: + cdef _ArrayBuilderBase builder = None + cdef bson_subtype_t subtype + cdef const uint8_t *val_buf = NULL + cdef uint32_t val_buf_len = 0 + + # Mark a null key as missing until we find it. + if value_t == BSON_TYPE_NULL: + self.builder_map[key] = None + return + + if builder is not None: + return builder + + # Handle the builders. + if value_t == BSON_TYPE_DATE_TIME: + if self.tzinfo is not None: + arrow_type = timestamp('ms', tz=self.tzinfo) + builder = DatetimeBuilder(dtype=arrow_type, memory_pool=self.pool) + else: + builder = DatetimeBuilder(memory_pool=self.pool) + elif value_t == BSON_TYPE_DOCUMENT: + builder = DocumentBuilder() + elif value_t == BSON_TYPE_ARRAY: + builder = ListBuilder(memory_pool=self.pool) + elif value_t == BSON_TYPE_BINARY: + if doc_iter == NULL: + raise ValueError('Did not pass a doc_iter!') + bson_iter_binary (doc_iter, &subtype, + &val_buf_len, &val_buf) + builder = BinaryBuilder(subtype, memory_pool=self.pool) + elif value_t == ARROW_TYPE_DATE32: + builder = Date32Builder(memory_pool=self.pool) + elif value_t == ARROW_TYPE_DATE64: + builder = Date64Builder(memory_pool=self.pool) + elif value_t == BSON_TYPE_INT32: + builder = Int32Builder(memory_pool=self.pool) + elif value_t == BSON_TYPE_INT64: + builder = Int64Builder(memory_pool=self.pool) + elif value_t == BSON_TYPE_DOUBLE: + builder = DoubleBuilder(memory_pool=self.pool) + elif value_t == BSON_TYPE_OID: + builder = ObjectIdBuilder(memory_pool=self.pool) + elif value_t == BSON_TYPE_UTF8: + builder = StringBuilder(memory_pool=self.pool) + elif value_t == BSON_TYPE_BOOL: + builder = BoolBuilder(memory_pool=self.pool) + elif value_t == BSON_TYPE_DECIMAL128: + builder = Decimal128Builder(memory_pool=self.pool) + elif value_t == BSON_TYPE_CODE: + builder = CodeBuilder(memory_pool=self.pool) + + self.builder_map[key] = builder + return builder + + cdef void parse_document(self, bson_iter_t * doc_iter, cstring base_key, uint8_t parent_type) except *: + cdef bson_type_t value_t + cdef cstring key + cdef cstring full_key + cdef bson_iter_t child_iter + cdef uint64_t count = self.count + cdef _ArrayBuilderBase builder = None + + while bson_iter_next(doc_iter): + # Get the key and and value. + key = bson_iter_key(doc_iter) + value_t = bson_iter_type(doc_iter) + + # Get the appropriate full key. + if parent_type == BSON_TYPE_ARRAY: + full_key = base_key + full_key.append(b"[]") + + elif parent_type == BSON_TYPE_DOCUMENT: + full_key = base_key + full_key.append(b".") + full_key.append(key) + (self.builder_map[base_key]).add_field(key) + + else: + full_key = key + + # Get the builder. + builder = <_ArrayBuilderBase>self.builder_map.get(full_key, None) + if builder is None and not self.has_schema: + builder = self.get_builder(full_key, value_t, doc_iter) + if builder is None: + continue + + # Append nulls to catch up. + # For lists, the nulls are stored in the parent. + if parent_type != BSON_TYPE_ARRAY: + if count > builder.length(): + status = builder.append_nulls_raw(count - builder.length()) + if not status.ok(): + raise ValueError("Failed to append nulls to", full_key.decode('utf8')) + + # Append the next value. + status = builder.append_raw(doc_iter, value_t) + if not status.ok(): + raise ValueError("Could not append raw value to", full_key.decode('utf8')) + + # Recurse into documents. + if value_t == BSON_TYPE_DOCUMENT: + bson_iter_recurse(doc_iter, &child_iter) + self.parse_document(&child_iter, full_key, BSON_TYPE_DOCUMENT) + + # Recurse into arrays. + if value_t == BSON_TYPE_ARRAY: + bson_iter_recurse(doc_iter, &child_iter) + self.parse_document(&child_iter, full_key, BSON_TYPE_ARRAY) + + # If we're a list element, increment the offset counter. + if parent_type == BSON_TYPE_ARRAY: + (self.builder_map[base_key]).append_count() + + cpdef void process_bson_stream(self, const uint8_t* bson_stream, size_t length): + """Process a bson byte stream.""" + cdef bson_reader_t* stream_reader = bson_reader_new_from_data(bson_stream, length) + cdef const bson_t * doc = NULL + cdef bson_iter_t doc_iter + try: + while True: + doc = bson_reader_read_safe(stream_reader) + if doc == NULL: + break + if not bson_iter_init(&doc_iter, doc): + raise InvalidBSON("Could not read BSON document") + self.parse_document(&doc_iter, b"", 0) + self.count += 1 + finally: + bson_reader_destroy(stream_reader) + + cpdef finish(self): + """Finish appending to the builders.""" + cdef dict return_map = {} + cdef bytes key + cdef str field + cdef CStatus status + cdef _ArrayBuilderBase value + + # Move the builders to a new dict with string keys. + for key, value in self.builder_map.items(): + return_map[key.decode('utf-8')] = value + + # Insert null fields. + for field in list(return_map): + if return_map[field] is None: + return_map[field] = NullBuilder(memory_pool=self.pool) + + # Pad fields as needed. + for field, value in return_map.items(): + # If it isn't a list item, append nulls as needed. + # For lists, the nulls are stored in the parent. + if not field.endswith('[]'): + if value.length() < self.count: + status = value.append_nulls_raw(self.count - value.length()) + if not status.ok(): + raise ValueError("Failed to append nulls to", field) + + return return_map -_builder_type_map = { - BSON_TYPE_INT32: Int32Builder, - BSON_TYPE_INT64: Int64Builder, - BSON_TYPE_DOUBLE: DoubleBuilder, - BSON_TYPE_DATE_TIME: DatetimeBuilder, - BSON_TYPE_OID: ObjectIdBuilder, - BSON_TYPE_UTF8: StringBuilder, - BSON_TYPE_BOOL: BoolBuilder, - BSON_TYPE_DOCUMENT: DocumentBuilder, - BSON_TYPE_DECIMAL128: Decimal128Builder, - BSON_TYPE_ARRAY: ListBuilder, - BSON_TYPE_BINARY: BinaryBuilder, - BSON_TYPE_CODE: CodeBuilder, - ARROW_TYPE_DATE32: Date32Builder, - ARROW_TYPE_DATE64: Date64Builder, - ARROW_TYPE_NULL: NullBuilder -} - -_field_type_map = { - BSON_TYPE_INT32: int32(), - BSON_TYPE_INT64: int64(), - BSON_TYPE_DOUBLE: float64(), - BSON_TYPE_OID: ObjectIdType(), - BSON_TYPE_UTF8: string(), - BSON_TYPE_BOOL: bool_(), - BSON_TYPE_DECIMAL128: Decimal128Type_(), - BSON_TYPE_CODE: CodeType(), -} - - -cdef object extract_field_dtype(bson_iter_t * doc_iter, bson_iter_t * child_iter, bson_type_t value_t, object context): - """Get the appropriate data type for a specific field""" - cdef const uint8_t *val_buf = NULL - cdef uint32_t val_buf_len = 0 - cdef bson_subtype_t subtype - - if value_t in _field_type_map: - field_type = _field_type_map[value_t] - elif value_t == BSON_TYPE_ARRAY: - bson_iter_recurse(doc_iter, child_iter) - list_dtype = extract_array_dtype(child_iter, context) - field_type = list_(list_dtype) - elif value_t == BSON_TYPE_DOCUMENT: - bson_iter_recurse(doc_iter, child_iter) - field_type = extract_document_dtype(child_iter, context) - elif value_t == BSON_TYPE_DATE_TIME: - field_type = timestamp('ms', tz=context.tzinfo) - elif value_t == BSON_TYPE_BINARY: - bson_iter_binary (doc_iter, &subtype, &val_buf_len, &val_buf) - field_type = BinaryType(subtype) - elif value_t == BSON_TYPE_NULL: - field_type = None - else: - raise PyMongoArrowError('unknown value type {}'.format(value_t)) - return field_type - - -cdef object extract_document_dtype(bson_iter_t * doc_iter, object context): - """Get the appropriate data type for a sub document""" - cdef const char* key - cdef bson_type_t value_t - cdef bson_iter_t child_iter - fields = [] - while bson_iter_next(doc_iter): - key = bson_iter_key(doc_iter) - value_t = bson_iter_type(doc_iter) - field_type = extract_field_dtype(doc_iter, &child_iter, value_t, context) - if field_type is not None: - fields.append(field(key.decode('utf-8'), field_type)) - if fields: - return struct(fields) - return None - - -cdef object extract_array_dtype(bson_iter_t * doc_iter, object context): - """Get the appropriate data type for a sub array""" - cdef const char* key - cdef bson_type_t value_t - cdef bson_iter_t child_iter - fields = [] - while bson_iter_next(doc_iter): - value_t = bson_iter_type(doc_iter) - field_type = extract_field_dtype(doc_iter, &child_iter, value_t, context) - if field_type is not None: - return field_type - return None - - -def process_bson_stream(bson_stream, context, arr_value_builder=None): - """Process a bson byte stream using a PyMongoArrowContext""" - cdef const uint8_t* docstream = bson_stream - cdef size_t length = PyBytes_Size(bson_stream) - process_raw_bson_stream(bson_stream, length, context, arr_value_builder) - - -cdef void process_raw_bson_stream(const uint8_t * docstream, size_t length, object context, object arr_value_builder) except *: - cdef bson_reader_t* stream_reader = bson_reader_new_from_data(docstream, length) - cdef uint32_t str_len - cdef uint8_t dec128_buf[16] - cdef const uint8_t *val_buf = NULL - cdef uint32_t val_buf_len = 0 - cdef bson_decimal128_t dec128 - cdef bson_type_t value_t - cdef const char * bson_str = NULL - cdef StructType struct_dtype - cdef const bson_t * doc = NULL - cdef bson_iter_t doc_iter - cdef bson_iter_t child_iter - cdef const char* key - cdef uint8_t ftype - cdef Py_ssize_t count = 0 - cdef uint8_t byte_order_status = 0 - cdef map[cstring, void *] builder_map - cdef map[cstring, void *] missing_builders - cdef map[cstring, void*].iterator it - cdef bson_subtype_t subtype - cdef int32_t val32 - cdef int64_t val64 - - cdef _ArrayBuilderBase builder = None - cdef Int32Builder int32_builder - cdef DoubleBuilder double_builder - cdef ObjectIdBuilder objectid_builder - cdef StringBuilder string_builder - cdef CodeBuilder code_builder - cdef Int64Builder int64_builder - cdef BoolBuilder bool_builder - cdef BinaryBuilder binary_builder - cdef DatetimeBuilder datetime_builder - cdef Decimal128Builder dec128_builder - cdef ListBuilder list_builder - cdef DocumentBuilder doc_builder - cdef Date32Builder date32_builder - cdef Date64Builder date64_builder - cdef NullBuilder null_builder - - # Build up a map of the builders. - for key, value in context.builder_map.items(): - builder_map[key] = value - - # Initialize count to current length of builders. - if len(context.builder_map): - builder = next(iter(context.builder_map.values())) - count = len(builder) - - try: - while True: - doc = bson_reader_read_safe(stream_reader) - if doc == NULL: - break - if not bson_iter_init(&doc_iter, doc): - raise InvalidBSON("Could not read BSON document") - while bson_iter_next(&doc_iter): - key = bson_iter_key(&doc_iter) - builder = None - if arr_value_builder is not None: - builder = arr_value_builder - - if builder is None: - it = builder_map.find(key) - if it != builder_map.end(): - builder = <_ArrayBuilderBase>builder_map[key] - - if builder is None and context.schema is None: - # Get the appropriate builder for the current field. - value_t = bson_iter_type(&doc_iter) - builder_type = _builder_type_map.get(value_t) - - # Keep the key in missing builders until we find it. - if builder_type is None: - missing_builders[key] = None - continue - - it = missing_builders.find(key) - if it != builder_map.end(): - missing_builders.erase(key) - - # Handle the parameterized builders. - if builder_type == DatetimeBuilder and context.tzinfo is not None: - arrow_type = timestamp('ms', tz=context.tzinfo) - builder = DatetimeBuilder(dtype=arrow_type) - - elif builder_type == DocumentBuilder: - bson_iter_recurse(&doc_iter, &child_iter) - struct_dtype = extract_document_dtype(&child_iter, context) - if struct_dtype is None: - continue - builder = DocumentBuilder(struct_dtype, context.tzinfo) - elif builder_type == ListBuilder: - bson_iter_recurse(&doc_iter, &child_iter) - list_dtype = extract_array_dtype(&child_iter, context) - if list_dtype is None: - continue - list_dtype = list_(list_dtype) - builder = ListBuilder(list_dtype, context.tzinfo, value_builder=arr_value_builder) - elif builder_type == BinaryBuilder: - bson_iter_binary (&doc_iter, &subtype, - &val_buf_len, &val_buf) - builder = BinaryBuilder(subtype) - elif builder_type == Date32Builder: - builder = Date32Builder() - elif builder_type == Date64Builder: - builder = Date64Builder() - else: - builder = builder_type() - if arr_value_builder is None: - builder_map[key] = builder - context.builder_map[key] = builder - for _ in range(count): - builder.append_null() - - if builder is None: - continue - - ftype = builder.type_marker - value_t = bson_iter_type(&doc_iter) - if ftype == BSON_TYPE_INT32: - int32_builder = builder - if (value_t == BSON_TYPE_INT32 or value_t == BSON_TYPE_BOOL): - int32_builder.append_raw(bson_iter_as_int64(&doc_iter)) - elif value_t == BSON_TYPE_INT64: - val64 = bson_iter_as_int64(&doc_iter) - val32 = val64 - if val64 == val32: - int32_builder.append_raw(val32) - else: - # Use append (not append_raw) to surface overflow errors. - int32_builder.append(val64) - elif value_t == BSON_TYPE_DOUBLE: - # Treat nan as null. - val = bson_iter_as_double(&doc_iter) - if isnan(val): - int32_builder.append_null() - else: - # Use append (not append_raw) to surface overflow errors. - int32_builder.append(bson_iter_as_int64(&doc_iter)) - else: - int32_builder.append_null() - elif ftype == BSON_TYPE_INT64: - int64_builder = builder - if (value_t == BSON_TYPE_INT64 or - value_t == BSON_TYPE_BOOL or - value_t == BSON_TYPE_INT32): - int64_builder.append_raw(bson_iter_as_int64(&doc_iter)) - elif value_t == BSON_TYPE_DOUBLE: - # Treat nan as null. - val = bson_iter_as_double(&doc_iter) - if isnan(val): - int64_builder.append_null() - else: - int64_builder.append_raw(bson_iter_as_int64(&doc_iter)) - else: - int64_builder.append_null() - elif ftype == BSON_TYPE_OID: - objectid_builder = builder - if value_t == BSON_TYPE_OID: - objectid_builder.append_raw(bson_iter_oid(&doc_iter)) - else: - objectid_builder.append_null() - elif ftype == BSON_TYPE_UTF8: - string_builder = builder - if value_t == BSON_TYPE_UTF8: - bson_str = bson_iter_utf8(&doc_iter, &str_len) - string_builder.append_raw(bson_str, str_len) - else: - string_builder.append_null() - elif ftype == BSON_TYPE_CODE: - code_builder = builder - if value_t == BSON_TYPE_CODE: - bson_str = bson_iter_code(&doc_iter, &str_len) - code_builder.append_raw(bson_str, str_len) - else: - code_builder.append_null() - elif ftype == BSON_TYPE_DECIMAL128: - dec128_builder = builder - if value_t == BSON_TYPE_DECIMAL128: - bson_iter_decimal128(&doc_iter, &dec128) - if byte_order_status == 0: - if sys.byteorder == 'little': - byte_order_status = 1 - else: - byte_order_status = 2 - if byte_order_status == 1: - memcpy(dec128_buf, &dec128.low, 8); - memcpy(dec128_buf + 8, &dec128.high, 8) - dec128_builder.append_raw(dec128_buf) - else: - # We do not support big-endian systems. - dec128_builder.append_null() - else: - dec128_builder.append_null() - elif ftype == BSON_TYPE_DOUBLE: - double_builder = builder - if (value_t == BSON_TYPE_DOUBLE or - value_t == BSON_TYPE_BOOL or - value_t == BSON_TYPE_INT32 or - value_t == BSON_TYPE_INT64): - double_builder.append_raw(bson_iter_as_double(&doc_iter)) - else: - double_builder.append_null() - elif ftype == ARROW_TYPE_DATE32: - date32_builder = builder - if value_t == BSON_TYPE_DATE_TIME: - date32_builder.append_raw(bson_iter_date_time(&doc_iter)) - else: - date32_builder.append_null() - elif ftype == ARROW_TYPE_DATE64: - date64_builder = builder - if value_t == BSON_TYPE_DATE_TIME: - date64_builder.append_raw(bson_iter_date_time(&doc_iter)) - else: - date64_builder.append_null() - elif ftype == BSON_TYPE_DATE_TIME: - datetime_builder = builder - if value_t == BSON_TYPE_DATE_TIME: - datetime_builder.append_raw(bson_iter_date_time(&doc_iter)) - else: - datetime_builder.append_null() - elif ftype == BSON_TYPE_BOOL: - bool_builder = builder - if value_t == BSON_TYPE_BOOL: - bool_builder.append_raw(bson_iter_bool(&doc_iter)) - else: - bool_builder.append_null() - elif ftype == BSON_TYPE_DOCUMENT: - doc_builder = builder - if value_t == BSON_TYPE_DOCUMENT: - bson_iter_document(&doc_iter, &val_buf_len, &val_buf) - if val_buf_len <= 0: - raise ValueError("Subdocument is invalid") - doc_builder.append_raw(val_buf, val_buf_len) - else: - doc_builder.append_null() - elif ftype == BSON_TYPE_ARRAY: - list_builder = builder - if value_t == BSON_TYPE_ARRAY: - bson_iter_array(&doc_iter, &val_buf_len, &val_buf) - if val_buf_len <= 0: - raise ValueError("Subarray is invalid") - list_builder.append_raw(val_buf, val_buf_len) - else: - list_builder.append_null() - elif ftype == BSON_TYPE_BINARY: - binary_builder = builder - if value_t == BSON_TYPE_BINARY: - bson_iter_binary (&doc_iter, &subtype, - &val_buf_len, &val_buf) - if subtype != binary_builder._subtype: - binary_builder.append_null() - else: - binary_builder.append_raw(val_buf, val_buf_len) - elif ftype == ARROW_TYPE_NULL: - null_builder = builder - null_builder.append_null() - else: - raise PyMongoArrowError('unknown ftype {}'.format(ftype)) - - # Append nulls as needed to builders to account for any missing - # field(s). - count += 1 - it = builder_map.begin() - while it != builder_map.end(): - builder = <_ArrayBuilderBase>(dereference(it).second) - if len(builder) != count: - builder.append_null() - preincrement(it) - - # Any missing fields that are left must be null fields. - it = missing_builders.begin() - while it != missing_builders.end(): - builder = NullBuilder() - context.builder_map[key] = builder - null_builder = builder - for _ in range(count): - null_builder.append_null() - preincrement(it) - - finally: - bson_reader_destroy(stream_reader) - - -# Builders cdef class _ArrayBuilderBase: - cdef uint8_t type_marker + cdef: + public uint8_t type_marker - cpdef append_values(self, values): + def append_values(self, values): for value in values: if value is None or value is np.nan: self.append_null() else: self.append(value) + def append(self, value): + """Interface to append a python value to the builder. + """ + cdef bson_reader_t* stream_reader = NULL + cdef const bson_t * doc = NULL + cdef bson_iter_t doc_iter + + data = bson.encode(dict(data=value)) + stream_reader = bson_reader_new_from_data(data, len(data)) + doc = bson_reader_read_safe(stream_reader) + if doc == NULL: + raise ValueError("Could not append", value) + if not bson_iter_init(&doc_iter, doc): + raise InvalidBSON("Could not read BSON document") + while bson_iter_next(&doc_iter): + bson_iter_key(&doc_iter) + value_t = bson_iter_type(&doc_iter) + status = self.append_raw(&doc_iter, value_t) + if not status.ok(): + raise ValueError("Could not append raw value of type", value_t) + + cdef CStatus append_raw(self, bson_iter_t * doc_iter, bson_type_t value_t): + pass + + cdef shared_ptr[CArrayBuilder] get_builder(self): + pass -cdef class StringBuilder(_ArrayBuilderBase): - cdef: - shared_ptr[CStringBuilder] builder + def __len__(self): + return self.length() - def __cinit__(self, MemoryPool memory_pool=None): - cdef CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) - self.builder.reset(new CStringBuilder(pool)) - self.type_marker = BSON_TYPE_UTF8 + cpdef append_null(self): + cdef CStatus status = self.append_null_raw() + if not status.ok(): + raise ValueError("Could not append null value") - cdef append_raw(self, const char * value, uint32_t str_len): - self.builder.get().Append(value, str_len) + cpdef void append_nulls(self, uint64_t count): + for _ in range(count): + self.append_null() - cpdef append_null(self): - self.builder.get().AppendNull() + cdef CStatus append_null_raw(self): + return self.get_builder().get().AppendNull() - cpdef append(self, value): - value = tobytes(value) - self.append_raw(value, len(value)) + cdef CStatus append_nulls_raw(self, uint64_t count): + cdef CStatus status + for _ in range(count): + status = self.append_null_raw() + if not status.ok(): + return status - def __len__(self): - return self.builder.get().length() + cpdef uint64_t length(self): + return self.get_builder().get().length() - cpdef finish(self): + def finish(self): cdef shared_ptr[CArray] out + cdef CStatus status + cdef shared_ptr[CArrayBuilder] builder = self.get_builder() with nogil: - self.builder.get().Finish(&out) + status = builder.get().Finish(&out) + if not status.ok(): + raise ValueError("Failed to convert value to array") return pyarrow_wrap_array(out) - cdef shared_ptr[CStringBuilder] unwrap(self): - return self.builder + +cdef class StringBuilder(_ArrayBuilderBase): + cdef: + shared_ptr[CStringBuilder] builder + + def __cinit__(self, MemoryPool memory_pool=None): + cdef CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) + self.builder.reset(new CStringBuilder(pool)) + self.type_marker = BSON_TYPE_UTF8 + + cdef CStatus append_raw(self, bson_iter_t * doc_iter, bson_type_t value_t): + cdef const char* value + cdef uint32_t str_len + if value_t == BSON_TYPE_UTF8: + value = bson_iter_utf8(doc_iter, &str_len) + return self.builder.get().Append(value, str_len) + return self.builder.get().AppendNull() + + cdef shared_ptr[CArrayBuilder] get_builder(self): + return self.builder cdef class CodeBuilder(StringBuilder): @@ -498,16 +348,23 @@ cdef class CodeBuilder(StringBuilder): self.builder.reset(new CStringBuilder(pool)) self.type_marker = BSON_TYPE_CODE - cpdef finish(self): - cdef shared_ptr[CArray] out - with nogil: - self.builder.get().Finish(&out) - return pyarrow_wrap_array(out).cast(CodeType()) + cdef CStatus append_raw(self, bson_iter_t * doc_iter, bson_type_t value_t): + cdef const char * bson_str + cdef uint32_t str_len + if value_t == BSON_TYPE_CODE: + bson_str = bson_iter_code(doc_iter, &str_len) + return self.builder.get().Append(bson_str, str_len) + return self.builder.get().AppendNull() + + cdef shared_ptr[CArrayBuilder] get_builder(self): + return self.builder + + def finish(self): + return super().finish().cast(CodeType()) cdef class ObjectIdBuilder(_ArrayBuilderBase): - cdef: - shared_ptr[CFixedSizeBinaryBuilder] builder + cdef shared_ptr[CFixedSizeBinaryBuilder] builder def __cinit__(self, MemoryPool memory_pool=None): cdef shared_ptr[CDataType] dtype = fixed_size_binary(12) @@ -515,125 +372,103 @@ cdef class ObjectIdBuilder(_ArrayBuilderBase): self.builder.reset(new CFixedSizeBinaryBuilder(dtype, pool)) self.type_marker = BSON_TYPE_OID - cdef append_raw(self, const bson_oid_t * value): - self.builder.get().Append(value.bytes) - - cpdef append(self, value): - self.builder.get().Append(value) + cdef CStatus append_raw(self, bson_iter_t * doc_iter, bson_type_t value_t): + if value_t == BSON_TYPE_OID: + return self.builder.get().Append(bson_iter_oid(doc_iter).bytes) + return self.builder.get().AppendNull() - cpdef append_null(self): - self.builder.get().AppendNull() - - def __len__(self): - return self.builder.get().length() - - cpdef finish(self): - cdef shared_ptr[CArray] out - with nogil: - self.builder.get().Finish(&out) - return pyarrow_wrap_array(out).cast(ObjectIdType()) + cdef shared_ptr[CArrayBuilder] get_builder(self): + return self.builder - cdef shared_ptr[CFixedSizeBinaryBuilder] unwrap(self): - return self.builder + def finish(self): + return super().finish().cast(ObjectIdType()) cdef class Int32Builder(_ArrayBuilderBase): - cdef: - shared_ptr[CInt32Builder] builder + cdef shared_ptr[CInt32Builder] builder def __cinit__(self, MemoryPool memory_pool=None): cdef CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) self.builder.reset(new CInt32Builder(pool)) self.type_marker = BSON_TYPE_INT32 - cdef append_raw(self, int32_t value): - self.builder.get().Append(value) - - cpdef append(self, value): - self.builder.get().Append(value) - - cpdef append_null(self): - self.builder.get().AppendNull() - - def __len__(self): - return self.builder.get().length() - - cpdef finish(self): - cdef shared_ptr[CArray] out - with nogil: - self.builder.get().Finish(&out) - return pyarrow_wrap_array(out) - - cdef shared_ptr[CInt32Builder] unwrap(self): - return self.builder + cdef CStatus append_raw(self, bson_iter_t * doc_iter, bson_type_t value_t) except *: + cdef double dvalue + cdef int64_t ivalue + + if (value_t == BSON_TYPE_INT32 or value_t == BSON_TYPE_BOOL or value_t == BSON_TYPE_INT64): + # Check for overflow errors. + ivalue = bson_iter_as_int64(doc_iter) + if ivalue > INT_MAX or ivalue < INT_MIN: + raise OverflowError("Overflowed Int32 value") + return self.builder.get().Append(ivalue) + if value_t == BSON_TYPE_DOUBLE: + # Treat nan as null. + dvalue = bson_iter_as_double(doc_iter) + if isnan(dvalue): + return self.builder.get().AppendNull() + # Check for overflow errors. + ivalue = bson_iter_as_int64(doc_iter) + if ivalue > INT_MAX or ivalue < INT_MIN: + raise OverflowError("Overflowed Int32 value") + return self.builder.get().Append(ivalue) + return self.builder.get().AppendNull() + + cdef shared_ptr[CArrayBuilder] get_builder(self): + return self.builder cdef class Int64Builder(_ArrayBuilderBase): - cdef: - shared_ptr[CInt64Builder] builder + cdef shared_ptr[CInt64Builder] builder def __cinit__(self, MemoryPool memory_pool=None): cdef CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) self.builder.reset(new CInt64Builder(pool)) self.type_marker = BSON_TYPE_INT64 - cdef append_raw(self, int64_t value): - self.builder.get().Append(value) + cdef CStatus append_raw(self, bson_iter_t * doc_iter, bson_type_t value_t): + cdef double dvalue - cpdef append(self, value): - self.builder.get().Append(value) + if (value_t == BSON_TYPE_INT64 or + value_t == BSON_TYPE_BOOL or + value_t == BSON_TYPE_INT32): + return self.builder.get().Append(bson_iter_as_int64(doc_iter)) + if value_t == BSON_TYPE_DOUBLE: + # Treat nan as null. + dvalue = bson_iter_as_double(doc_iter) + if isnan(dvalue): + return self.builder.get().AppendNull() + return self.builder.get().Append(bson_iter_as_int64(doc_iter)) + return self.builder.get().AppendNull() - cpdef append_null(self): - self.builder.get().AppendNull() - - def __len__(self): - return self.builder.get().length() - - cpdef finish(self): - cdef shared_ptr[CArray] out - with nogil: - self.builder.get().Finish(&out) - return pyarrow_wrap_array(out) - - cdef shared_ptr[CInt64Builder] unwrap(self): - return self.builder + cdef shared_ptr[CArrayBuilder] get_builder(self): + return self.builder cdef class DoubleBuilder(_ArrayBuilderBase): - cdef: - shared_ptr[CDoubleBuilder] builder + cdef shared_ptr[CDoubleBuilder] builder def __cinit__(self, MemoryPool memory_pool=None): cdef CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) self.builder.reset(new CDoubleBuilder(pool)) self.type_marker = BSON_TYPE_DOUBLE - cdef append_raw(self, double value): - self.builder.get().Append(value) + cdef CStatus append_raw(self, bson_iter_t * doc_iter, bson_type_t value_t): + if (value_t == BSON_TYPE_DOUBLE or + value_t == BSON_TYPE_BOOL or + value_t == BSON_TYPE_INT32 or + value_t == BSON_TYPE_INT64): + return self.builder.get().Append(bson_iter_as_double(doc_iter)) + return self.builder.get().AppendNull() - cpdef append(self, value): - self.builder.get().Append(value) - - cpdef append_null(self): - self.builder.get().AppendNull() - - def __len__(self): - return self.builder.get().length() - - cpdef finish(self): - cdef shared_ptr[CArray] out - with nogil: - self.builder.get().Finish(&out) - return pyarrow_wrap_array(out) - - cdef shared_ptr[CDoubleBuilder] unwrap(self): - return self.builder + cdef shared_ptr[CArrayBuilder] get_builder(self): + return self.builder cdef class DatetimeBuilder(_ArrayBuilderBase): cdef: - shared_ptr[CTimestampBuilder] builder TimestampType dtype + shared_ptr[CTimestampBuilder] builder def __cinit__(self, TimestampType dtype=timestamp('ms'), MemoryPool memory_pool=None): @@ -647,386 +482,223 @@ cdef class DatetimeBuilder(_ArrayBuilderBase): pyarrow_unwrap_data_type(self.dtype), pool)) self.type_marker = BSON_TYPE_DATE_TIME - cdef append_raw(self, int64_t value): - self.builder.get().Append(value) - - cpdef append(self, value): - self.builder.get().Append(value) - - cpdef append_null(self): - self.builder.get().AppendNull() - - def __len__(self): - return self.builder.get().length() - @property def unit(self): return self.dtype - cpdef finish(self): - cdef shared_ptr[CArray] out - with nogil: - self.builder.get().Finish(&out) - return pyarrow_wrap_array(out) + cdef CStatus append_raw(self, bson_iter_t * doc_iter, bson_type_t value_t): + if value_t == BSON_TYPE_DATE_TIME: + return self.builder.get().Append(bson_iter_date_time(doc_iter)) + return self.builder.get().AppendNull() - cdef shared_ptr[CTimestampBuilder] unwrap(self): - return self.builder + cdef shared_ptr[CArrayBuilder] get_builder(self): + return self.builder cdef class Date64Builder(_ArrayBuilderBase): cdef: - shared_ptr[CDate64Builder] builder DataType dtype + shared_ptr[CDate64Builder] builder def __cinit__(self, MemoryPool memory_pool=None): cdef CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) self.builder.reset(new CDate64Builder(pool)) self.type_marker = ARROW_TYPE_DATE64 - cdef append_raw(self, int64_t value): - self.builder.get().Append(value) - - cpdef append(self, value): - self.builder.get().Append(value) - - cpdef append_null(self): - self.builder.get().AppendNull() - - def __len__(self): - return self.builder.get().length() + cdef CStatus append_raw(self, bson_iter_t * doc_iter, bson_type_t value_t): + if value_t == BSON_TYPE_DATE_TIME: + return self.builder.get().Append(bson_iter_date_time(doc_iter)) + return self.builder.get().AppendNull() @property def unit(self): return self.dtype - cpdef finish(self): - cdef shared_ptr[CArray] out - with nogil: - self.builder.get().Finish(&out) - return pyarrow_wrap_array(out) + cdef shared_ptr[CArrayBuilder] get_builder(self): + return self.builder - cdef shared_ptr[CDate64Builder] unwrap(self): - return self.builder cdef class Date32Builder(_ArrayBuilderBase): cdef: - shared_ptr[CDate32Builder] builder DataType dtype + shared_ptr[CDate32Builder] builder def __cinit__(self, MemoryPool memory_pool=None): cdef CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) self.builder.reset(new CDate32Builder(pool)) self.type_marker = ARROW_TYPE_DATE32 - cdef append_raw(self, int64_t value): - # Convert from milliseconds to days (1000*60*60*24) - cdef int32_t seconds_val = value // 86400000 - self.builder.get().Append(seconds_val) - - cpdef append(self, value): - self.builder.get().Append(value) + cdef CStatus append_raw(self, bson_iter_t * doc_iter, bson_type_t value_t): + cdef int64_t value + cdef int32_t seconds_val - cpdef append_null(self): - self.builder.get().AppendNull() - - def __len__(self): - return self.builder.get().length() + if value_t == BSON_TYPE_DATE_TIME: + value = bson_iter_date_time(doc_iter) + # Convert from milliseconds to days (1000*60*60*24) + seconds_val = value // 86400000 + return self.builder.get().Append(seconds_val) + return self.builder.get().AppendNull() @property def unit(self): return self.dtype - cpdef finish(self): - cdef shared_ptr[CArray] out - with nogil: - self.builder.get().Finish(&out) - return pyarrow_wrap_array(out) - - cdef shared_ptr[CDate32Builder] unwrap(self): - return self.builder - + cdef shared_ptr[CArrayBuilder] get_builder(self): + return self.builder cdef class NullBuilder(_ArrayBuilderBase): - cdef: - shared_ptr[CNullBuilder] builder + cdef shared_ptr[CArrayBuilder] builder def __cinit__(self, MemoryPool memory_pool=None): cdef CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) self.builder.reset(new CNullBuilder(pool)) self.type_marker = ARROW_TYPE_NULL - cdef append_raw(self, void* value): - self.builder.get().AppendNull() - - cpdef append(self, value): - self.builder.get().AppendNull() - - cpdef append_null(self): - self.builder.get().AppendNull() - - def __len__(self): - return self.builder.get().length() - - cpdef finish(self): - cdef shared_ptr[CArray] out - with nogil: - self.builder.get().Finish(&out) - return pyarrow_wrap_array(out) + cdef CStatus append_raw(self, bson_iter_t * doc_iter, bson_type_t value_t): + return self.builder.get().AppendNull() - cdef shared_ptr[CNullBuilder] unwrap(self): - return self.builder + cdef shared_ptr[CArrayBuilder] get_builder(self): + return self.builder cdef class BoolBuilder(_ArrayBuilderBase): - cdef: - shared_ptr[CBooleanBuilder] builder + cdef shared_ptr[CBooleanBuilder] builder def __cinit__(self, MemoryPool memory_pool=None): cdef CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) self.builder.reset(new CBooleanBuilder(pool)) self.type_marker = BSON_TYPE_BOOL - cdef append_raw(self, cbool value): - self.builder.get().Append(value) - - cpdef append(self, cbool value): - self.builder.get().Append(value) - - cpdef append_null(self): - self.builder.get().AppendNull() - - def __len__(self): - return self.builder.get().length() - - cpdef finish(self): - cdef shared_ptr[CArray] out - with nogil: - self.builder.get().Finish(&out) - return pyarrow_wrap_array(out) - - cdef shared_ptr[CBooleanBuilder] unwrap(self): - return self.builder + cdef CStatus append_raw(self, bson_iter_t * doc_iter, bson_type_t value_t): + if value_t == BSON_TYPE_BOOL: + return self.builder.get().Append(bson_iter_bool(doc_iter)) + return self.builder.get().AppendNull() + cdef shared_ptr[CArrayBuilder] get_builder(self): + return self.builder cdef class Decimal128Builder(_ArrayBuilderBase): - cdef: - shared_ptr[CFixedSizeBinaryBuilder] builder + cdef shared_ptr[CFixedSizeBinaryBuilder] builder + cdef uint8_t supported def __cinit__(self, MemoryPool memory_pool=None): cdef shared_ptr[CDataType] dtype = fixed_size_binary(16) cdef CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) self.builder.reset(new CFixedSizeBinaryBuilder(dtype, pool)) self.type_marker = BSON_TYPE_DECIMAL128 + if sys.byteorder == 'little': + self.supported = 1 + else: + self.supported = 0 - cdef append_raw(self, uint8_t * buf): - self.builder.get().Append(buf) + cdef CStatus append_raw(self, bson_iter_t * doc_iter, bson_type_t value_t): + cdef uint8_t dec128_buf[16] + cdef bson_decimal128_t dec128 - cpdef append(self, value): - self.builder.get().Append(value) + if self.supported == 0: + # We do not support big-endian systems. + return self.builder.get().AppendNull() - cpdef append_null(self): - self.builder.get().AppendNull() + if value_t == BSON_TYPE_DECIMAL128: + bson_iter_decimal128(doc_iter, &dec128) + memcpy(dec128_buf, &dec128.low, 8); + memcpy(dec128_buf + 8, &dec128.high, 8) + return self.builder.get().Append(dec128_buf) + return self.builder.get().AppendNull() - def __len__(self): - return self.builder.get().length() + cdef shared_ptr[CArrayBuilder] get_builder(self): + return self.builder - cpdef finish(self): - cdef shared_ptr[CArray] out - with nogil: - self.builder.get().Finish(&out) - return pyarrow_wrap_array(out).cast(Decimal128Type_()) - - cdef shared_ptr[CFixedSizeBinaryBuilder] unwrap(self): - return self.builder - - -cdef object get_field_builder(object field, object tzinfo): - """"Find the appropriate field builder given a pyarrow field""" - cdef object field_builder - cdef DataType field_type - if isinstance(field, DataType): - field_type = field - else: - field_type = field.type - if _atypes.is_int32(field_type): - field_builder = Int32Builder() - elif _atypes.is_int64(field_type): - field_builder = Int64Builder() - elif _atypes.is_float64(field_type): - field_builder = DoubleBuilder() - elif _atypes.is_timestamp(field_type): - if tzinfo and field_type.tz is None: - field_type = timestamp(field_type.unit, tz=tzinfo) - field_builder = DatetimeBuilder(field_type) - elif _atypes.is_string(field_type): - field_builder = StringBuilder() - elif _atypes.is_large_string(field_type): - field_builder = StringBuilder() - elif _atypes.is_boolean(field_type): - field_builder = BoolBuilder() - elif _atypes.is_struct(field_type): - field_builder = DocumentBuilder(field_type, tzinfo) - elif _atypes.is_list(field_type): - field_builder = ListBuilder(field_type, tzinfo) - elif _atypes.is_large_list(field_type): - field_builder = ListBuilder(field_type, tzinfo) - elif _atypes.is_null(field_type): - field_builder = NullBuilder() - elif getattr(field_type, '_type_marker') == _BsonArrowTypes.objectid: - field_builder = ObjectIdBuilder() - elif getattr(field_type, '_type_marker') == _BsonArrowTypes.decimal128: - field_builder = Decimal128Builder() - elif getattr(field_type, '_type_marker') == _BsonArrowTypes.binary: - field_builder = BinaryBuilder(field_type.subtype) - else: - field_builder = StringBuilder() - return field_builder + def finish(self): + return super().finish().cast(Decimal128Type_()) -cdef class DocumentBuilder(_ArrayBuilderBase): +cdef class BinaryBuilder(_ArrayBuilderBase): cdef: - shared_ptr[CStructBuilder] builder - object dtype - object context + uint8_t _subtype + shared_ptr[CStringBuilder] builder - def __cinit__(self, StructType dtype, tzinfo=None, MemoryPool memory_pool=None): - cdef StringBuilder field_builder - cdef vector[shared_ptr[CArrayBuilder]] c_field_builders + def __cinit__(self, uint8_t subtype, MemoryPool memory_pool=None): cdef CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) - - self.dtype = dtype - if not _atypes.is_struct(dtype): - raise ValueError("dtype must be a struct()") - - self.context = context = PyMongoArrowContext(None, {}) - context.tzinfo = tzinfo - builder_map = context.builder_map - - for field in dtype: - field_builder = get_field_builder(field, tzinfo) - builder_map[field.name.encode('utf-8')] = field_builder - c_field_builders.push_back(field_builder.builder) - - self.builder.reset(new CStructBuilder(pyarrow_unwrap_data_type(dtype), pool, c_field_builders)) - self.type_marker = BSON_TYPE_DOCUMENT + self._subtype = subtype + self.builder.reset(new CStringBuilder(pool)) + self.type_marker = BSON_TYPE_BINARY @property - def dtype(self): - return self.dtype + def subtype(self): + return self._subtype - cdef append_raw(self, const uint8_t * buf, size_t length): - # Populate the child builders. - process_raw_bson_stream(buf, length, self.context, None) - # Append an element to the Struct. "All child-builders' Append method - # must be called independently to maintain data-structure consistency." - # Pass "true" for is_valid. - self.builder.get().Append(True) + cdef CStatus append_raw(self, bson_iter_t * doc_iter, bson_type_t value_t): + cdef const char * val_buf + cdef uint32_t val_buf_len + cdef bson_subtype_t subtype - cpdef append(self, value): - if not isinstance(value, bytes): - value = bson.encode(value) - self.append_raw(value, len(value)) + if value_t == BSON_TYPE_BINARY: + bson_iter_binary(doc_iter, &subtype, &val_buf_len, &val_buf) + if subtype != self._subtype: + return self.builder.get().AppendNull() + return self.builder.get().Append(val_buf, val_buf_len) + return self.builder.get().AppendNull() - cpdef append_null(self): - self.builder.get().AppendNull() + cdef shared_ptr[CArrayBuilder] get_builder(self): + return self.builder - def __len__(self): - return self.builder.get().length() + def finish(self): + return super().finish().cast(BinaryType(self._subtype)) - cpdef finish(self): - cdef shared_ptr[CArray] out - with nogil: - self.builder.get().Finish(&out) - return pyarrow_wrap_array(out) - cdef shared_ptr[CStructBuilder] unwrap(self): - return self.builder - - -cdef class ListBuilder(_ArrayBuilderBase): +cdef class DocumentBuilder(_ArrayBuilderBase): + """The document builder stores a map of field names that can be retrieved as a set.""" cdef: - shared_ptr[CListBuilder] builder - _ArrayBuilderBase child_builder - object dtype - object context + dict field_map + int64_t count - def __cinit__(self, DataType dtype, tzinfo=None, MemoryPool memory_pool=None, value_builder=None): - cdef StringBuilder field_builder - cdef CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) - cdef shared_ptr[CArrayBuilder] grandchild_builder - self.dtype = dtype - if not (_atypes.is_list(dtype) or _atypes.is_large_list(dtype)): - raise ValueError("dtype must be a list_() or large_list()") - self.context = context = PyMongoArrowContext(None, {}) - self.context.tzinfo = tzinfo - field_builder = get_field_builder(self.dtype.value_type, tzinfo) - grandchild_builder = field_builder.builder - self.child_builder = field_builder - self.builder.reset(new CListBuilder(pool, grandchild_builder, pyarrow_unwrap_data_type(dtype))) - self.type_marker = BSON_TYPE_ARRAY + def __cinit__(self): + self.type_marker = BSON_TYPE_DOCUMENT + self.field_map = dict() + cdef CStatus append_raw(self, bson_iter_t * doc_iter, bson_type_t value_t): + self.count += 1 + return CStatus_OK() - @property - def dtype(self): - return self.dtype + cpdef uint64_t length(self): + return self.count - cdef append_raw(self, const uint8_t * buf, size_t length): - # Append an element to the array. - # arr_value_builder will be appended to by process_bson_stream. - self.builder.get().Append(True) - process_raw_bson_stream(buf, length, self.context, self.child_builder) + cdef CStatus append_null_raw(self): + self.count += 1 + return CStatus_OK() - cpdef append(self, value): - if not isinstance(value, bytes): - value = bson.encode(value) - self.append_raw(value, len(value)) + cpdef void add_field(self, cstring field_name): + self.field_map[field_name] = 1 - cpdef append_null(self): - self.builder.get().AppendNull() + def finish(self): + # Fields must be in order if we were given a schema. + return list(f.decode('utf-8') for f in self.field_map) - def __len__(self): - return self.builder.get().length() - cpdef finish(self): - cdef shared_ptr[CArray] out - with nogil: - self.builder.get().Finish(&out) - return pyarrow_wrap_array(out) - - cdef shared_ptr[CListBuilder] unwrap(self): - return self.builder - - -cdef class BinaryBuilder(_ArrayBuilderBase): +cdef class ListBuilder(_ArrayBuilderBase): + """The list builder stores an int32 list of offsets and a counter with the current value.""" cdef: - shared_ptr[CBinaryBuilder] builder - uint8_t _subtype - - def __cinit__(self, uint8_t subtype): - self._subtype = subtype - self.builder.reset(new CBinaryBuilder()) - self.type_marker = BSON_TYPE_BINARY - - @property - def subtype(self): - return self._subtype + int64_t count + shared_ptr[CInt32Builder] builder - cdef append_raw(self, const char * value, uint32_t str_len): - self.builder.get().Append(value, str_len) + def __cinit__(self, MemoryPool memory_pool=None): + cdef CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) + self.builder.reset(new CInt32Builder(pool)) + self.count = 0 + self.type_marker = BSON_TYPE_ARRAY - cpdef append_null(self): - self.builder.get().AppendNull() + cdef CStatus append_raw(self, bson_iter_t * doc_iter, bson_type_t value_t): + return self.builder.get().Append(self.count) - cpdef append(self, value): - self.append_raw(value, len(value)) + cpdef void append_count(self): + self.count += 1 - def __len__(self): - return self.builder.get().length() + cdef CStatus append_null_raw(self): + return self.builder.get().Append(self.count) - cpdef finish(self): - cdef shared_ptr[CArray] out - with nogil: - self.builder.get().Finish(&out) - return pyarrow_wrap_array(out).cast(BinaryType(self._subtype)) + cdef shared_ptr[CArrayBuilder] get_builder(self): + return self.builder - cdef shared_ptr[CBinaryBuilder] unwrap(self): - return self.builder + def finish(self): + self.builder.get().Append(self.count) + return super().finish() diff --git a/bindings/python/pymongoarrow/libarrow.pxd b/bindings/python/pymongoarrow/libarrow.pxd index 81efa91a..9f0efa20 100644 --- a/bindings/python/pymongoarrow/libarrow.pxd +++ b/bindings/python/pymongoarrow/libarrow.pxd @@ -18,7 +18,7 @@ from libcpp.vector cimport vector from libc.stdint cimport int32_t, uint8_t from pyarrow.lib cimport * -from pyarrow.includes.libarrow cimport (CStatus, CMemoryPool) # noqa: E211 +from pyarrow.includes.libarrow cimport (CStatus, CStatus_OK, CMemoryPool) # noqa: E211 # libarrow type wrappings @@ -51,3 +51,9 @@ cdef extern from "arrow/builder.h" namespace "arrow" nogil: cdef extern from "arrow/type_fwd.h" namespace "arrow" nogil: shared_ptr[CDataType] fixed_size_binary(int32_t byte_width) + + +# Values used to check for overflow errors. +cdef extern from "limits.h": + cdef int INT_MAX + cdef int INT_MIN diff --git a/bindings/python/pymongoarrow/types.py b/bindings/python/pymongoarrow/types.py index 7901a81c..fa52b2c7 100644 --- a/bindings/python/pymongoarrow/types.py +++ b/bindings/python/pymongoarrow/types.py @@ -14,6 +14,7 @@ import enum from datetime import datetime +import bson import numpy as np import pyarrow as pa import pyarrow.types as _atypes @@ -42,21 +43,22 @@ class _BsonArrowTypes(enum.Enum): - datetime = 1 - double = 2 - int32 = 3 - int64 = 4 - objectid = 5 - string = 6 - bool = 7 - decimal128 = 8 - document = 9 - array = 10 - binary = 11 - code = 12 - date32 = 13 - date64 = 14 - null = 15 + datetime = ord(bson.BSONDAT) + double = ord(bson.BSONNUM) + int32 = ord(bson.BSONINT) + int64 = ord(bson.BSONLON) + objectid = ord(bson.BSONOID) + string = ord(bson.BSONSTR) + bool = ord(bson.BSONBOO) + decimal128 = ord(bson.BSONDEC) + document = ord(bson.BSONOBJ) + array = ord(bson.BSONARR) + binary = ord(bson.BSONBIN) + code = ord(bson.BSONCOD) + # Keep in sync with constants in lib.pyx + date32 = 100 + date64 = 101 + null = 102 # Custom Extension Types. @@ -258,23 +260,23 @@ def get_numpy_type(type): _TYPE_CHECKER_TO_INTERNAL_TYPE = { - _atypes.is_int32: _BsonArrowTypes.int32, - _atypes.is_int64: _BsonArrowTypes.int64, - _atypes.is_float64: _BsonArrowTypes.double, - _atypes.is_timestamp: _BsonArrowTypes.datetime, - _atypes.is_null: _BsonArrowTypes.null, - _is_objectid: _BsonArrowTypes.objectid, - _is_decimal128: _BsonArrowTypes.decimal128, - _is_binary: _BsonArrowTypes.binary, - _is_code: _BsonArrowTypes.code, - _atypes.is_string: _BsonArrowTypes.string, - _atypes.is_boolean: _BsonArrowTypes.bool, - _atypes.is_struct: _BsonArrowTypes.document, - _atypes.is_list: _BsonArrowTypes.array, - _atypes.is_date32: _BsonArrowTypes.date32, - _atypes.is_date64: _BsonArrowTypes.date64, - _atypes.is_large_string: _BsonArrowTypes.string, - _atypes.is_large_list: _BsonArrowTypes.array, + _atypes.is_int32: _BsonArrowTypes.int32.value, + _atypes.is_int64: _BsonArrowTypes.int64.value, + _atypes.is_float64: _BsonArrowTypes.double.value, + _atypes.is_timestamp: _BsonArrowTypes.datetime.value, + _atypes.is_null: _BsonArrowTypes.null.value, + _is_objectid: _BsonArrowTypes.objectid.value, + _is_decimal128: _BsonArrowTypes.decimal128.value, + _is_binary: _BsonArrowTypes.binary.value, + _is_code: _BsonArrowTypes.code.value, + _atypes.is_string: _BsonArrowTypes.string.value, + _atypes.is_boolean: _BsonArrowTypes.bool.value, + _atypes.is_struct: _BsonArrowTypes.document.value, + _atypes.is_list: _BsonArrowTypes.array.value, + _atypes.is_date32: _BsonArrowTypes.date32.value, + _atypes.is_date64: _BsonArrowTypes.date64.value, + _atypes.is_large_string: _BsonArrowTypes.string.value, + _atypes.is_large_list: _BsonArrowTypes.array.value, } @@ -310,7 +312,7 @@ def _get_internal_typemap(typemap): for fname, ftype in typemap.items(): for checker, internal_id in _TYPE_CHECKER_TO_INTERNAL_TYPE.items(): if checker(ftype): - internal_typemap[fname] = internal_id + internal_typemap[fname] = (internal_id, ftype) break if fname not in internal_typemap: diff --git a/bindings/python/test/test_arrow.py b/bindings/python/test/test_arrow.py index 3c22fb10..cc831518 100644 --- a/bindings/python/test/test_arrow.py +++ b/bindings/python/test/test_arrow.py @@ -23,8 +23,6 @@ import pymongo from bson import Binary, Code, CodecOptions, Decimal128, ObjectId from pyarrow import ( - DataType, - FixedSizeBinaryType, Table, bool_, csv, @@ -405,7 +403,7 @@ def inner(i): if use_none: inner_dict["null"] = None if nested_elem: - inner_dict["list"] = [nested_elem] + inner_dict["list_of_objs"] = [nested_elem] return inner_dict if nested_elem: @@ -480,6 +478,18 @@ def test_auto_schema_nested(self): for name in out.column_names: self.assertEqual(data[name], out[name].cast(data[name].type)) + def test_auto_schema_nested_null(self): + # Create table with random data of various types. + _, data = self._create_nested_data(use_none=True) + + self.coll.drop() + res = write(self.coll, data) + self.assertEqual(len(data), res.raw_result["insertedCount"]) + for func in [find_arrow_all, aggregate_arrow_all]: + out = func(self.coll, {} if func == find_arrow_all else []).drop(["_id"]) + for name in out.column_names: + self.assertEqual(data[name], out[name].cast(data[name].type)) + def test_schema_nested_null(self): schema, data = self._create_nested_data(use_none=True) @@ -514,7 +524,13 @@ def test_auto_schema_first_list_null(self): {"a": ["str"]}, {"a": []}, ] - expected = pa.Table.from_pylist(docs) + expected = pa.Table.from_pylist( + [ + {"a": []}, + {"a": ["str"]}, + {"a": []}, + ] + ) self._test_auto_schema_list(docs, expected) def test_auto_schema_first_list_empty(self): @@ -525,7 +541,7 @@ def test_auto_schema_first_list_empty(self): ] expected = pa.Table.from_pylist( [ - {"a": None}, # TODO: We incorrectly set the first empty list to null. + {"a": []}, {"a": ["str"]}, {"a": []}, ] @@ -538,20 +554,30 @@ def test_auto_schema_first_list_element_null(self): {"a": [None, None, "str"]}, # Inferred schema should use the first non-null element. {"a": []}, ] - expected = pa.Table.from_pylist(docs) + expected = pa.Table.from_pylist( + [ + {"a": []}, + {"a": ["str"]}, # Inferred schema should use the first non-null element. + {"a": []}, + ] + ) self._test_auto_schema_list(docs, expected) - @unittest.expectedFailure # TODO: Our inferred value for the first a.b field differs from pyarrow's. def test_auto_schema_first_embedded_list_null(self): docs = [ {"a": {"b": None}}, {"a": {"b": ["str"]}}, {"a": {"b": []}}, ] - expected = pa.Table.from_pylist(docs) + expected = pa.Table.from_pylist( + [ + {"a": {"b": []}}, + {"a": {"b": ["str"]}}, + {"a": {"b": []}}, + ] + ) self._test_auto_schema_list(docs, expected) - @unittest.expectedFailure # TODO: Our inferred value for the first a.b field differs from pyarrow's. def test_auto_schema_first_embedded_doc_null(self): docs = [ {"a": {"b": None}}, @@ -747,18 +773,10 @@ def test_nested_bson_extension_types(self): out = find_arrow_all(self.coll, {}) obj_schema_type = out.field("obj").type - self.assertIsInstance(obj_schema_type.field("obj_id").type, FixedSizeBinaryType) - self.assertIsInstance(obj_schema_type.field("dec_128").type, FixedSizeBinaryType) - self.assertIsInstance(obj_schema_type.field("binary").type, DataType) - self.assertIsInstance(obj_schema_type.field("code").type, DataType) - - new_types = [ObjectIdType(), Decimal128Type(), BinaryType(0), CodeType()] - new_names = [f.name for f in out["obj"].type] - new_obj = out["obj"].cast(struct(zip(new_names, new_types))) - self.assertIsInstance(new_obj.type[0].type, ObjectIdType) - self.assertIsInstance(new_obj.type[1].type, Decimal128Type) - self.assertIsInstance(new_obj.type[2].type, BinaryType) - self.assertIsInstance(new_obj.type[3].type, CodeType) + self.assertIsInstance(obj_schema_type.field("obj_id").type, ObjectIdType) + self.assertIsInstance(obj_schema_type.field("dec_128").type, Decimal128Type) + self.assertIsInstance(obj_schema_type.field("binary").type, BinaryType) + self.assertIsInstance(obj_schema_type.field("code").type, CodeType) def test_large_string_type(self): """Tests pyarrow._large_string() DataType""" diff --git a/bindings/python/test/test_bson.py b/bindings/python/test/test_bson.py index 6ca03cbf..a293b900 100644 --- a/bindings/python/test/test_bson.py +++ b/bindings/python/test/test_bson.py @@ -17,7 +17,6 @@ from bson import Decimal128, Int64, InvalidBSON, encode from pymongoarrow.context import PyMongoArrowContext -from pymongoarrow.lib import process_bson_stream from pymongoarrow.schema import Schema from pymongoarrow.types import ObjectId, ObjectIdType, int64, string @@ -25,7 +24,7 @@ class TestBsonToArrowConversionBase(TestCase): def setUp(self): self.schema = Schema({"_id": ObjectId, "data": int64(), "title": string()}) - self.context = PyMongoArrowContext.from_schema(self.schema) + self.context = PyMongoArrowContext(self.schema) @staticmethod def _generate_payload(doclist): @@ -37,7 +36,7 @@ def _generate_payload(doclist): def _run_test(self, doclist, as_dict): payload = type(self)._generate_payload(doclist) - process_bson_stream(payload, self.context) + self.context.process_bson_stream(payload) table = self.context.finish() table_dict = table.to_pydict() @@ -105,13 +104,13 @@ def test_simple(self): schema = Schema({"_id": ObjectId, "data": int64(), "fake": pa.float16()}) msg = 'Unsupported data type in schema for field "fake" of type "halffloat"' with self.assertRaisesRegex(ValueError, msg): - PyMongoArrowContext.from_schema(schema) + PyMongoArrowContext(schema) class TestNonAsciiFieldName(TestBsonToArrowConversionBase): def setUp(self): self.schema = Schema({"_id": ObjectIdType(), "dätá": int64()}) - self.context = PyMongoArrowContext.from_schema(self.schema) + self.context = PyMongoArrowContext(self.schema) def test_simple(self): ids = [ObjectId() for i in range(4)] @@ -151,7 +150,7 @@ def test_object_id_type(self): class TestInt64Type(TestBsonToArrowConversionBase): def setUp(self): self.schema = Schema({"data": Int64}) - self.context = PyMongoArrowContext.from_schema(self.schema) + self.context = PyMongoArrowContext(self.schema) def test_simple(self): docs = [ @@ -166,7 +165,7 @@ def test_simple(self): class TestBooleanType(TestBsonToArrowConversionBase): def setUp(self): self.schema = Schema({"data": bool}) - self.context = PyMongoArrowContext.from_schema(self.schema) + self.context = PyMongoArrowContext(self.schema) def test_simple(self): docs = [ @@ -184,7 +183,7 @@ def test_simple(self): class TestStringType(TestBsonToArrowConversionBase): def setUp(self): self.schema = Schema({"data": str}) - self.context = PyMongoArrowContext.from_schema(self.schema) + self.context = PyMongoArrowContext(self.schema) def test_simple(self): docs = [ @@ -198,7 +197,7 @@ def test_simple(self): class TestDecimal128Type(TestBsonToArrowConversionBase): def setUp(self): self.schema = Schema({"data": Decimal128}) - self.context = PyMongoArrowContext.from_schema(self.schema) + self.context = PyMongoArrowContext(self.schema) def test_simple(self): docs = [ @@ -213,7 +212,7 @@ def test_simple(self): class TestSubdocumentType(TestBsonToArrowConversionBase): def setUp(self): self.schema = Schema({"data": dict(x=bool)}) - self.context = PyMongoArrowContext.from_schema(self.schema) + self.context = PyMongoArrowContext(self.schema) def test_simple(self): docs = [ @@ -238,7 +237,7 @@ def test_simple(self): def test_nested(self): self.schema = Schema({"data": dict(x=bool, y=dict(a=int))}) - self.context = PyMongoArrowContext.from_schema(self.schema) + self.context = PyMongoArrowContext(self.schema) docs = [ {"data": dict(x=True, y=dict(a=1))}, diff --git a/bindings/python/test/test_builders.py b/bindings/python/test/test_builders.py index d6849213..3c09fa54 100644 --- a/bindings/python/test/test_builders.py +++ b/bindings/python/test/test_builders.py @@ -11,16 +11,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import calendar -from datetime import date, datetime, timedelta, timezone + +from datetime import datetime, timedelta, timezone from unittest import TestCase -from bson import Binary, Code, Decimal128, ObjectId -from pyarrow import Array, bool_, field, int32, int64, list_, struct, timestamp +from bson import Binary, Code, Decimal128, ObjectId, encode +from pyarrow import Array, bool_, int32, int64, timestamp from pymongoarrow.lib import ( BinaryBuilder, BoolBuilder, + BuilderManager, CodeBuilder, Date32Builder, Date64Builder, @@ -34,7 +35,6 @@ ObjectIdBuilder, StringBuilder, ) -from pymongoarrow.types import ObjectIdType class IntBuildersTestMixin: @@ -42,13 +42,14 @@ def test_simple(self): builder = self.builder_cls() builder.append(0) builder.append_values([1, 2, 3, 4]) + builder.append("a") builder.append_null() arr = builder.finish() self.assertIsInstance(arr, Array) - self.assertEqual(arr.null_count, 1) - self.assertEqual(len(arr), 6) - self.assertEqual(arr.to_pylist(), [0, 1, 2, 3, 4, None]) + self.assertEqual(arr.null_count, 2) + self.assertEqual(len(arr), 7) + self.assertEqual(arr.to_pylist(), [0, 1, 2, 3, 4, None, None]) self.assertEqual(arr.type, self.data_type) @@ -70,36 +71,23 @@ def test_default_unit(self): builder = DatetimeBuilder() self.assertEqual(builder.unit, timestamp("ms")) - def _datetime_to_millis(self, dtm): - """Convert datetime to milliseconds since epoch UTC. - Vendored from bson.""" - if dtm.utcoffset() is not None: - dtm = dtm - dtm.utcoffset() - return int(calendar.timegm(dtm.timetuple()) * 1000 + dtm.microsecond // 1000) - - def _millis_only(self, dt): - """Convert a datetime to millisecond resolution.""" - micros = (dt.microsecond // 1000) * 1000 - return dt.replace(microsecond=micros) - def test_simple(self): self.maxDiff = None builder = DatetimeBuilder(dtype=timestamp("ms")) datetimes = [datetime.now(timezone.utc) + timedelta(days=k * 100) for k in range(5)] - builder.append(self._datetime_to_millis(datetimes[0])) - builder.append_values([self._datetime_to_millis(k) for k in datetimes[1:]]) + builder.append(datetimes[0]) + builder.append_values(datetimes[1:]) + builder.append(1) builder.append_null() arr = builder.finish() self.assertIsInstance(arr, Array) - self.assertEqual(arr.null_count, 1) - self.assertEqual(len(arr), len(datetimes) + 1) - for actual, expected in zip(arr, datetimes + [None]): + self.assertEqual(arr.null_count, 2) + self.assertEqual(len(arr), len(datetimes) + 2) + for actual, expected in zip(arr, datetimes + [None, None]): if actual.is_valid: - self.assertEqual( - actual.as_py().timetuple(), self._millis_only(expected).timetuple() - ) + self.assertEqual(actual.as_py().timetuple(), expected.timetuple()) else: self.assertIsNone(expected) self.assertEqual(arr.type, timestamp("ms")) @@ -113,30 +101,33 @@ def test_unsupported_units(self): class TestDoubleBuilder(TestCase): def test_simple(self): builder = DoubleBuilder() - builder.append(0.123) - builder.append_values([1.234, 2.345, 3.456, 4.567]) + values = [0.123, 1.234, 2.345, 3.456, 4.567, 1] + builder.append(values[0]) + builder.append_values(values[1:]) + builder.append("a") builder.append_null() arr = builder.finish() self.assertIsInstance(arr, Array) - self.assertEqual(arr.null_count, 1) - self.assertEqual(len(arr), 6) - self.assertEqual(arr.to_pylist(), [0.123, 1.234, 2.345, 3.456, 4.567, None]) + self.assertEqual(arr.null_count, 2) + self.assertEqual(len(arr), 8) + self.assertEqual(arr.to_pylist(), values + [None, None]) class TestObjectIdBuilder(TestCase): def test_simple(self): ids = [ObjectId() for i in range(5)] builder = ObjectIdBuilder() - builder.append(ids[0].binary) - builder.append_values([oid.binary for oid in ids[1:]]) + builder.append(ids[0]) + builder.append_values(ids[1:]) + builder.append(b"123456789123") builder.append_null() arr = builder.finish() self.assertIsInstance(arr, Array) - self.assertEqual(arr.null_count, 1) - self.assertEqual(len(arr), 6) - self.assertEqual(arr.to_pylist(), ids + [None]) + self.assertEqual(arr.null_count, 2) + self.assertEqual(len(arr), 7) + self.assertEqual(arr.to_pylist(), ids + [None, None]) class TestStringBuilder(TestCase): @@ -146,74 +137,105 @@ def test_simple(self): values = ["Hello world", "Καλημέρα κόσμε", "コンニチハ"] values += ["hello\u0000world"] builder = StringBuilder() - builder.append(values[0].encode("utf8")) + builder.append(values[0]) builder.append_values(values[1:]) + builder.append(b"1") builder.append_null() arr = builder.finish() self.assertIsInstance(arr, Array) - self.assertEqual(arr.null_count, 1) - self.assertEqual(len(arr), 5) - self.assertEqual(arr.to_pylist(), values + [None]) + self.assertEqual(arr.null_count, 2) + self.assertEqual(len(arr), 6) + self.assertEqual(arr.to_pylist(), values + [None, None]) class TestDocumentBuilder(TestCase): def test_simple(self): - dtype = struct([field("a", int32()), field("b", bool_())]) - builder = DocumentBuilder(dtype) - builder.append({"a": 1, "b": True}) - builder.append_values([{"a": 1, "b": False}, {"a": 2, "b": True}]) + builder = DocumentBuilder() + builder.append(dict(a=1, b=2, c=3)) + builder.add_field(b"a") + builder.add_field(b"b") + builder.add_field(b"c") builder.append_null() - arr = builder.finish() - - self.assertIsInstance(arr, Array) - self.assertEqual(arr.null_count, 1) - self.assertEqual(len(arr), 4) - self.assertEqual(arr.type, dtype) - - def test_nested(self): - sub_struct = struct([field("c", bool_()), field("d", ObjectIdType())]) - dtype = struct([field("a", int32()), field("b", sub_struct)]) - builder = DocumentBuilder(dtype) - builder.append({"a": 1, "b": {"c": True, "d": ObjectId()}}) - builder.append_values( - [ - {"a": 1, "b": {"c": False, "d": ObjectId()}}, - {"a": 2, "b": {"c": True, "d": ObjectId()}}, - {"a": 3, "b": None}, # Null - {"a": 4}, # Missing - {"a": 5, "b": {}}, # Empty - {"a": 6, "b": 1}, # Wrong type - {"a": 6, "b": {"c": 1, "d": 1}}, # Wrong field types - ] - ) - builder.append_null() - arr = builder.finish() - - self.assertIsInstance(arr, Array) - self.assertEqual(arr.null_count, 1) - self.assertEqual(len(arr), 9) + builder.append(dict(a=1, b=2)) + builder.add_field(b"a") + builder.add_field(b"b") + names = builder.finish() + assert names == ["a", "b", "c"] class TestListBuilder(TestCase): def test_simple(self): - dtype = list_(int32()) - builder = ListBuilder(dtype) - builder.append({"1": 1, "2": 3}) - builder.append_values( - [ - {"1": 1, "2": 4}, - {"1": 2}, - {"1": None}, # Null - None, - {"a": 5, "b": 1}, - ] - ) + builder = ListBuilder() + builder.append([1, 2]) + builder.append_count() + builder.append_count() + builder.append_null() + builder.append([3, 4, 5]) + builder.append_count() + builder.append_count() + builder.append_count() + builder.append_null() arr = builder.finish() + assert arr.to_pylist() == [0, 2, 2, 5, 5] - self.assertIsInstance(arr, Array) - self.assertEqual(arr.null_count, 1) - self.assertEqual(len(arr), 6) + +class TestBuilderManager(TestCase): + def test_simple(self): + manager = BuilderManager({}, False, None) + data = b"".join(encode(d) for d in [dict(a=1), dict(a=2), dict(a=None), dict(a=4)]) + manager.process_bson_stream(data, len(data)) + array_map = manager.finish() + assert list(array_map) == ["a"] + assert next(iter(array_map.values())).finish().to_pylist() == [1, 2, None, 4] + + def test_nested_object(self): + inner_values = [] + for i in range(3): + inner_values.append(dict(a=i, b="1", c=None, d=[1.4], e=ObjectId(), f=None)) + values = [] + for i in range(3): + values.append(dict(c=inner_values[i], e=ObjectId(), f=None, g=[dict(a=1)])) + values.append(dict(c=None)) + inner = inner_values[0].copy() + inner["c"] = 1.0 + values.append(dict(c=inner, e=ObjectId(), f=None, g=[])) + manager = BuilderManager({}, False, None) + data = b"".join(encode(v) for v in values) + manager.process_bson_stream(data, len(data)) + array_map = manager.finish() + for key, value in array_map.items(): + array_map[key] = value.finish() + assert sorted(array_map.keys()) == [ + "c", + "c.a", + "c.b", + "c.c", + "c.d", + "c.d[]", + "c.e", + "c.f", + "e", + "f", + "g", + "g[]", + "g[].a", + ] + # Dict has its top level keys. + assert array_map["c"] == ["a", "b", "c", "d", "e", "f"] + # Deferred nested field. + assert array_map["c.c"].to_pylist() == [None, None, None, None, 1.0] + assert array_map["f"].to_pylist() == [None, None, None, None, None] + # List with a null in the middle. + assert array_map["c.d"].to_pylist() == [0, 1, 2, 3, 3, 4] + assert array_map["c.d[]"].to_pylist() == [1.4, 1.4, 1.4, 1.4] + # Regular item with a null in the middle. + assert array_map["c.b"].to_pylist() == ["1", "1", "1", None, "1"] + # Nested object ids are object ids. + obj = array_map["c.e"].to_pylist()[0] + assert isinstance(obj, ObjectId) + # Lists can contain objects. + assert array_map["g[].a"].to_pylist() == [1, 1, 1, None, None] class TestBinaryBuilder(TestCase): @@ -222,28 +244,30 @@ def test_simple(self): builder = BinaryBuilder(10) builder.append(data[0]) builder.append_values(data[1:]) + builder.append(1) builder.append_null() arr = builder.finish() self.assertIsInstance(arr, Array) - self.assertEqual(arr.null_count, 1) - self.assertEqual(len(arr), 6) - self.assertEqual(arr.to_pylist(), data + [None]) + self.assertEqual(arr.null_count, 2) + self.assertEqual(len(arr), 7) + self.assertEqual(arr.to_pylist(), data + [None, None]) class TestDecimal128Builder(TestCase): def test_simple(self): data = [Decimal128([i, i]) for i in range(5)] builder = Decimal128Builder() - builder.append(data[0].bid) - builder.append_values([item.bid for item in data[1:]]) + builder.append(data[0]) + builder.append_values(data[1:]) + builder.append(1) builder.append_null() arr = builder.finish() self.assertIsInstance(arr, Array) - self.assertEqual(arr.null_count, 1) - self.assertEqual(len(arr), 6) - self.assertEqual(arr.to_pylist(), data + [None]) + self.assertEqual(arr.null_count, 2) + self.assertEqual(len(arr), 7) + self.assertEqual(arr.to_pylist(), data + [None, None]) class BoolBuilderTestMixin: @@ -251,13 +275,16 @@ def test_simple(self): builder = BoolBuilder() builder.append(False) builder.append_values([True, False, True, False, True, False]) + builder.append(1) builder.append_null() arr = builder.finish() self.assertIsInstance(arr, Array) - self.assertEqual(arr.null_count, 1) - self.assertEqual(len(arr), 8) - self.assertEqual(arr.to_pylist(), [False, True, False, True, False, True, False, None]) + self.assertEqual(arr.null_count, 2) + self.assertEqual(len(arr), 9) + self.assertEqual( + arr.to_pylist(), [False, True, False, True, False, True, False, None, None] + ) self.assertEqual(arr.type, self.data_type) @@ -274,51 +301,49 @@ def test_simple(self): values = ["Hello world", "Καλημέρα κόσμε", "コンニチハ"] values += ["hello\u0000world"] builder = CodeBuilder() - builder.append(values[0].encode("utf8")) - builder.append_values(values[1:]) + builder.append(Code(values[0])) + builder.append_values([Code(v) for v in values[1:]]) + builder.append("foo") builder.append_null() arr = builder.finish() codes = [Code(v) for v in values] self.assertIsInstance(arr, Array) - self.assertEqual(arr.null_count, 1) - self.assertEqual(len(arr), 5) - self.assertEqual(arr.to_pylist(), codes + [None]) + self.assertEqual(arr.null_count, 2) + self.assertEqual(len(arr), 6) + self.assertEqual(arr.to_pylist(), codes + [None, None]) class TestDate32Builder(TestCase): def test_simple(self): - epoch = date(1970, 1, 1) - values = [date(2012, 1, 1), date(2012, 1, 2), date(2014, 4, 5)] + values = [datetime(1970 + i, 1, 1) for i in range(3)] builder = Date32Builder() - builder.append(values[0].toordinal() - epoch.toordinal()) - builder.append_values([v.toordinal() - epoch.toordinal() for v in values[1:]]) + builder.append(values[0]) + builder.append_values(values[1:]) + builder.append(1) builder.append_null() arr = builder.finish() self.assertIsInstance(arr, Array) - self.assertEqual(arr.null_count, 1) - self.assertEqual(len(arr), 4) - self.assertEqual(arr.to_pylist(), values + [None]) + self.assertEqual(arr.null_count, 2) + self.assertEqual(len(arr), 5) + dates = [v.date() for v in values] + self.assertEqual(arr.to_pylist(), dates + [None, None]) class TestDate64Builder(TestCase): def test_simple(self): - def msec_since_epoch(d): - epoch = datetime(1970, 1, 1) - d = datetime.fromordinal(d.toordinal()) - diff = d - epoch - return diff.total_seconds() * 1000 - - values = [date(2012, 1, 1), date(2012, 1, 2), date(2014, 4, 5)] + values = [datetime(1970 + i, 1, 1) for i in range(3)] builder = Date64Builder() - builder.append(msec_since_epoch(values[0])) - builder.append_values([msec_since_epoch(v) for v in values[1:]]) + builder.append(values[0]) + builder.append_values(values[1:]) + builder.append(1) builder.append_null() arr = builder.finish() self.assertIsInstance(arr, Array) - self.assertEqual(arr.null_count, 1) - self.assertEqual(len(arr), 4) - self.assertEqual(arr.to_pylist(), values + [None]) + self.assertEqual(arr.null_count, 2) + self.assertEqual(len(arr), 5) + dates = [v.date() for v in values] + self.assertEqual(arr.to_pylist(), dates + [None, None]) diff --git a/bindings/python/test/test_datetime.py b/bindings/python/test/test_datetime.py index af29c0c8..abd438a5 100644 --- a/bindings/python/test/test_datetime.py +++ b/bindings/python/test/test_datetime.py @@ -52,7 +52,7 @@ def test_context_creation_fails_with_unsupported_granularity(self): for g in unsupported_granularities: schema = Schema({"_id": int32(), "data": timestamp(g)}) with self.assertRaises(TypeError): - PyMongoArrowContext.from_schema(schema) + PyMongoArrowContext(schema) def test_round_trip(self): expected = Table.from_pydict(