@@ -22,7 +22,7 @@ import sys
22
22
# Python imports
23
23
import bson
24
24
import numpy as np
25
- from pyarrow import timestamp
25
+ from pyarrow import timestamp, default_memory_pool
26
26
27
27
from pymongoarrow.errors import InvalidBSON
28
28
from pymongoarrow.types import ObjectIdType, Decimal128Type as Decimal128Type_, BinaryType, CodeType
@@ -62,22 +62,24 @@ cdef class BuilderManager:
62
62
uint64_t count
63
63
bint has_schema
64
64
object tzinfo
65
+ object pool
65
66
66
67
def __cinit__ (self , dict schema_map , bint has_schema , object tzinfo ):
67
68
self .has_schema = has_schema
68
69
self .tzinfo = tzinfo
69
70
self .count = 0
70
71
self .builder_map = {}
72
+ self .pool = default_memory_pool()
71
73
# Unpack the schema map.
72
74
for fname, (ftype, arrow_type) in schema_map.items():
73
75
name = fname.encode(' utf-8' )
74
76
# special-case initializing builders for parameterized types
75
77
if ftype == BSON_TYPE_DATE_TIME:
76
78
if tzinfo is not None and arrow_type.tz is None :
77
79
arrow_type = timestamp(arrow_type.unit, tz = tzinfo) # noqa: PLW2901
78
- self .builder_map[name] = DatetimeBuilder(dtype = arrow_type)
80
+ self .builder_map[name] = DatetimeBuilder(dtype = arrow_type, memory_pool = self .pool )
79
81
elif ftype == BSON_TYPE_BINARY:
80
- self .builder_map[name] = BinaryBuilder(arrow_type.subtype)
82
+ self .builder_map[name] = BinaryBuilder(arrow_type.subtype, memory_pool = self .pool )
81
83
else :
82
84
# We only use the doc_iter for binary arrays, which are handled already.
83
85
self .get_builder(name, ftype, < bson_iter_t * > nullptr)
@@ -100,39 +102,39 @@ cdef class BuilderManager:
100
102
if value_t == BSON_TYPE_DATE_TIME:
101
103
if self .tzinfo is not None :
102
104
arrow_type = timestamp(' ms' , tz = self .tzinfo)
103
- builder = DatetimeBuilder(dtype = arrow_type)
105
+ builder = DatetimeBuilder(dtype = arrow_type, memory_pool = self .pool )
104
106
else :
105
- builder = DatetimeBuilder()
107
+ builder = DatetimeBuilder(memory_pool = self .pool )
106
108
elif value_t == BSON_TYPE_DOCUMENT:
107
109
builder = DocumentBuilder()
108
110
elif value_t == BSON_TYPE_ARRAY:
109
- builder = ListBuilder()
111
+ builder = ListBuilder(memory_pool = self .pool )
110
112
elif value_t == BSON_TYPE_BINARY:
111
113
if doc_iter == NULL :
112
114
raise ValueError (' Did not pass a doc_iter!' )
113
115
bson_iter_binary (doc_iter, & subtype,
114
116
& val_buf_len, & val_buf)
115
- builder = BinaryBuilder(subtype)
117
+ builder = BinaryBuilder(subtype, memory_pool = self .pool )
116
118
elif value_t == ARROW_TYPE_DATE32:
117
- builder = Date32Builder()
119
+ builder = Date32Builder(memory_pool = self .pool )
118
120
elif value_t == ARROW_TYPE_DATE64:
119
- builder = Date64Builder()
121
+ builder = Date64Builder(memory_pool = self .pool )
120
122
elif value_t == BSON_TYPE_INT32:
121
- builder = Int32Builder()
123
+ builder = Int32Builder(memory_pool = self .pool )
122
124
elif value_t == BSON_TYPE_INT64:
123
- builder = Int64Builder()
125
+ builder = Int64Builder(memory_pool = self .pool )
124
126
elif value_t == BSON_TYPE_DOUBLE:
125
- builder = DoubleBuilder()
127
+ builder = DoubleBuilder(memory_pool = self .pool )
126
128
elif value_t == BSON_TYPE_OID:
127
- builder = ObjectIdBuilder()
129
+ builder = ObjectIdBuilder(memory_pool = self .pool )
128
130
elif value_t == BSON_TYPE_UTF8:
129
- builder = StringBuilder()
131
+ builder = StringBuilder(memory_pool = self .pool )
130
132
elif value_t == BSON_TYPE_BOOL:
131
- builder = BoolBuilder()
133
+ builder = BoolBuilder(memory_pool = self .pool )
132
134
elif value_t == BSON_TYPE_DECIMAL128:
133
- builder = Decimal128Builder()
135
+ builder = Decimal128Builder(memory_pool = self .pool )
134
136
elif value_t == BSON_TYPE_CODE:
135
- builder = CodeBuilder()
137
+ builder = CodeBuilder(memory_pool = self .pool )
136
138
137
139
self .builder_map[key] = builder
138
140
return builder
@@ -175,12 +177,14 @@ cdef class BuilderManager:
175
177
# For lists, the nulls are stored in the parent.
176
178
if parent_type != BSON_TYPE_ARRAY:
177
179
if count > builder.length():
178
- builder.append_nulls(count - builder.length())
180
+ status = builder.append_nulls_raw(count - builder.length())
181
+ if not status.ok():
182
+ raise ValueError (" Failed to append nulls to" , full_key.decode(' utf8' ))
179
183
180
184
# Append the next value.
181
185
status = builder.append_raw(doc_iter, value_t)
182
186
if not status.ok():
183
- raise ValueError (" Could not append raw value" )
187
+ raise ValueError (" Could not append raw value to " , full_key.decode( ' utf8 ' ) )
184
188
185
189
# Recurse into documents.
186
190
if value_t == BSON_TYPE_DOCUMENT:
@@ -218,6 +222,7 @@ cdef class BuilderManager:
218
222
cdef dict return_map = {}
219
223
cdef bytes key
220
224
cdef str field
225
+ cdef CStatus status
221
226
cdef _ArrayBuilderBase value
222
227
223
228
# Move the builders to a new dict with string keys.
@@ -227,15 +232,17 @@ cdef class BuilderManager:
227
232
# Insert null fields.
228
233
for field in list (return_map):
229
234
if return_map[field] is None :
230
- return_map[field] = NullBuilder(self .count )
235
+ return_map[field] = NullBuilder(memory_pool = self .pool )
231
236
232
237
# Pad fields as needed.
233
238
for field, value in return_map.items():
234
239
# If it isn't a list item, append nulls as needed.
235
240
# For lists, the nulls are stored in the parent.
236
241
if not field.endswith(' []' ):
237
242
if value.length() < self .count:
238
- value.append_nulls(self .count - value.length())
243
+ status = value.append_nulls_raw(self .count - value.length())
244
+ if not status.ok():
245
+ raise ValueError (" Failed to append nulls to" , field)
239
246
240
247
return return_map
241
248
@@ -281,21 +288,36 @@ cdef class _ArrayBuilderBase:
281
288
def __len__ (self ):
282
289
return self .length()
283
290
284
- cpdef void append_null(self ):
285
- self .get_builder().get().AppendNull()
291
+ cpdef append_null(self ):
292
+ cdef CStatus status = self .append_null_raw()
293
+ if not status.ok():
294
+ raise ValueError (" Could not append null value" )
286
295
287
296
cpdef void append_nulls(self , uint64_t count):
288
297
for _ in range (count):
289
298
self .append_null()
290
299
300
+ cdef CStatus append_null_raw(self ):
301
+ return self .get_builder().get().AppendNull()
302
+
303
+ cdef CStatus append_nulls_raw(self , uint64_t count):
304
+ cdef CStatus status
305
+ for _ in range (count):
306
+ status = self .append_null_raw()
307
+ if not status.ok():
308
+ return status
309
+
291
310
cpdef uint64_t length(self ):
292
311
return self .get_builder().get().length()
293
312
294
313
def finish (self ):
295
314
cdef shared_ptr[CArray] out
315
+ cdef CStatus status
296
316
cdef shared_ptr[CArrayBuilder] builder = self .get_builder()
297
317
with nogil:
298
- builder.get().Finish(& out)
318
+ status = builder.get().Finish(& out)
319
+ if not status.ok():
320
+ raise ValueError (" Failed to convert value to array" )
299
321
return pyarrow_wrap_array(out)
300
322
301
323
@@ -526,13 +548,10 @@ cdef class Date32Builder(_ArrayBuilderBase):
526
548
cdef class NullBuilder(_ArrayBuilderBase):
527
549
cdef shared_ptr[CArrayBuilder] builder
528
550
529
- def __cinit__ (self , uint64_t count , MemoryPool memory_pool = None ):
551
+ def __cinit__ (self , MemoryPool memory_pool = None ):
530
552
cdef CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
531
- cdef uint64_t i
532
553
self .builder.reset(new CNullBuilder(pool))
533
554
self .type_marker = ARROW_TYPE_NULL
534
- for i in range (count):
535
- self .append_null()
536
555
537
556
cdef CStatus append_raw(self , bson_iter_t * doc_iter, bson_type_t value_t):
538
557
return self .builder.get().AppendNull()
@@ -596,11 +615,12 @@ cdef class Decimal128Builder(_ArrayBuilderBase):
596
615
cdef class BinaryBuilder(_ArrayBuilderBase):
597
616
cdef:
598
617
uint8_t _subtype
599
- shared_ptr[CBinaryBuilder ] builder
618
+ shared_ptr[CStringBuilder ] builder
600
619
601
- def __cinit__ (self , uint8_t subtype ):
620
+ def __cinit__ (self , uint8_t subtype , MemoryPool memory_pool = None ):
621
+ cdef CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
602
622
self ._subtype = subtype
603
- self .builder.reset(new CBinaryBuilder( ))
623
+ self .builder.reset(new CStringBuilder(pool ))
604
624
self .type_marker = BSON_TYPE_BINARY
605
625
606
626
@property
@@ -630,7 +650,7 @@ cdef class DocumentBuilder(_ArrayBuilderBase):
630
650
""" The document builder stores a map of field names that can be retrieved as a set."""
631
651
cdef:
632
652
dict field_map
633
- int32_t count
653
+ int64_t count
634
654
635
655
def __cinit__ (self ):
636
656
self .type_marker = BSON_TYPE_DOCUMENT
@@ -643,8 +663,9 @@ cdef class DocumentBuilder(_ArrayBuilderBase):
643
663
cpdef uint64_t length(self ):
644
664
return self .count
645
665
646
- cpdef void append_null (self ):
666
+ cdef CStatus append_null_raw (self ):
647
667
self .count += 1
668
+ return CStatus_OK()
648
669
649
670
cpdef void add_field(self , cstring field_name):
650
671
self .field_map[field_name] = 1
@@ -657,7 +678,7 @@ cdef class DocumentBuilder(_ArrayBuilderBase):
657
678
cdef class ListBuilder(_ArrayBuilderBase):
658
679
""" The list builder stores an int32 list of offsets and a counter with the current value."""
659
680
cdef:
660
- int32_t count
681
+ int64_t count
661
682
shared_ptr[CInt32Builder] builder
662
683
663
684
def __cinit__ (self , MemoryPool memory_pool = None ):
@@ -672,8 +693,8 @@ cdef class ListBuilder(_ArrayBuilderBase):
672
693
cpdef void append_count(self ):
673
694
self .count += 1
674
695
675
- cpdef void append_null (self ):
676
- self .builder.get().Append(self .count)
696
+ cdef CStatus append_null_raw (self ):
697
+ return self .builder.get().Append(self .count)
677
698
678
699
cdef shared_ptr[CArrayBuilder] get_builder(self ):
679
700
return < shared_ptr[CArrayBuilder]> self .builder
0 commit comments