@@ -22,7 +22,7 @@ import sys
22
22
# Python imports
23
23
import bson
24
24
import numpy as np
25
- from pyarrow import timestamp
25
+ from pyarrow import timestamp, default_memory_pool
26
26
27
27
from pymongoarrow.errors import InvalidBSON
28
28
from pymongoarrow.types import ObjectIdType, Decimal128Type as Decimal128Type_, BinaryType, CodeType
@@ -62,22 +62,24 @@ cdef class BuilderManager:
62
62
uint64_t count
63
63
bint has_schema
64
64
object tzinfo
65
+ public object pool
65
66
66
67
def __cinit__ (self , dict schema_map , bint has_schema , object tzinfo ):
67
68
self .has_schema = has_schema
68
69
self .tzinfo = tzinfo
69
70
self .count = 0
70
71
self .builder_map = {}
72
+ self .pool = default_memory_pool()
71
73
# Unpack the schema map.
72
74
for fname, (ftype, arrow_type) in schema_map.items():
73
75
name = fname.encode(' utf-8' )
74
76
# special-case initializing builders for parameterized types
75
77
if ftype == BSON_TYPE_DATE_TIME:
76
78
if tzinfo is not None and arrow_type.tz is None :
77
79
arrow_type = timestamp(arrow_type.unit, tz = tzinfo) # noqa: PLW2901
78
- self .builder_map[name] = DatetimeBuilder(dtype = arrow_type)
80
+ self .builder_map[name] = DatetimeBuilder(dtype = arrow_type, memory_pool = self .pool )
79
81
elif ftype == BSON_TYPE_BINARY:
80
- self .builder_map[name] = BinaryBuilder(arrow_type.subtype)
82
+ self .builder_map[name] = BinaryBuilder(arrow_type.subtype, memory_pool = self .pool )
81
83
else :
82
84
# We only use the doc_iter for binary arrays, which are handled already.
83
85
self .get_builder(name, ftype, < bson_iter_t * > nullptr)
@@ -100,39 +102,39 @@ cdef class BuilderManager:
100
102
if value_t == BSON_TYPE_DATE_TIME:
101
103
if self .tzinfo is not None :
102
104
arrow_type = timestamp(' ms' , tz = self .tzinfo)
103
- builder = DatetimeBuilder(dtype = arrow_type)
105
+ builder = DatetimeBuilder(dtype = arrow_type, memory_pool = self .pool )
104
106
else :
105
- builder = DatetimeBuilder()
107
+ builder = DatetimeBuilder(memory_pool = self .pool )
106
108
elif value_t == BSON_TYPE_DOCUMENT:
107
109
builder = DocumentBuilder()
108
110
elif value_t == BSON_TYPE_ARRAY:
109
- builder = ListBuilder()
111
+ builder = ListBuilder(memory_pool = self .pool )
110
112
elif value_t == BSON_TYPE_BINARY:
111
113
if doc_iter == NULL :
112
114
raise ValueError (' Did not pass a doc_iter!' )
113
115
bson_iter_binary (doc_iter, & subtype,
114
116
& val_buf_len, & val_buf)
115
- builder = BinaryBuilder(subtype)
117
+ builder = BinaryBuilder(subtype, memory_pool = self .pool )
116
118
elif value_t == ARROW_TYPE_DATE32:
117
- builder = Date32Builder()
119
+ builder = Date32Builder(memory_pool = self .pool )
118
120
elif value_t == ARROW_TYPE_DATE64:
119
- builder = Date64Builder()
121
+ builder = Date64Builder(memory_pool = self .pool )
120
122
elif value_t == BSON_TYPE_INT32:
121
- builder = Int32Builder()
123
+ builder = Int32Builder(memory_pool = self .pool )
122
124
elif value_t == BSON_TYPE_INT64:
123
- builder = Int64Builder()
125
+ builder = Int64Builder(memory_pool = self .pool )
124
126
elif value_t == BSON_TYPE_DOUBLE:
125
- builder = DoubleBuilder()
127
+ builder = DoubleBuilder(memory_pool = self .pool )
126
128
elif value_t == BSON_TYPE_OID:
127
- builder = ObjectIdBuilder()
129
+ builder = ObjectIdBuilder(memory_pool = self .pool )
128
130
elif value_t == BSON_TYPE_UTF8:
129
- builder = StringBuilder()
131
+ builder = StringBuilder(memory_pool = self .pool )
130
132
elif value_t == BSON_TYPE_BOOL:
131
- builder = BoolBuilder()
133
+ builder = BoolBuilder(memory_pool = self .pool )
132
134
elif value_t == BSON_TYPE_DECIMAL128:
133
- builder = Decimal128Builder()
135
+ builder = Decimal128Builder(memory_pool = self .pool )
134
136
elif value_t == BSON_TYPE_CODE:
135
- builder = CodeBuilder()
137
+ builder = CodeBuilder(memory_pool = self .pool )
136
138
137
139
self .builder_map[key] = builder
138
140
return builder
@@ -175,12 +177,15 @@ cdef class BuilderManager:
175
177
# For lists, the nulls are stored in the parent.
176
178
if parent_type != BSON_TYPE_ARRAY:
177
179
if count > builder.length():
178
- builder.append_nulls(count - builder.length())
180
+ for _ in range (count - builder.length()):
181
+ status = builder.append_null_raw()
182
+ if not status.ok():
183
+ raise ValueError (" Could not append nulls to" , full_key)
179
184
180
185
# Append the next value.
181
186
status = builder.append_raw(doc_iter, value_t)
182
187
if not status.ok():
183
- raise ValueError (" Could not append raw value" )
188
+ raise ValueError (" Could not append raw value to " , full_key, type (builder), self .count )
184
189
185
190
# Recurse into documents.
186
191
if value_t == BSON_TYPE_DOCUMENT:
@@ -196,9 +201,6 @@ cdef class BuilderManager:
196
201
if parent_type == BSON_TYPE_ARRAY:
197
202
(< ListBuilder> self .builder_map[base_key]).append_count()
198
203
199
- # Update our count for top level documents.
200
- if parent_type == 0 :
201
- self .count += 1
202
204
203
205
cpdef void process_bson_stream(self , const uint8_t* bson_stream, size_t length):
204
206
""" Process a bson byte stream."""
@@ -212,6 +214,7 @@ cdef class BuilderManager:
212
214
break
213
215
if not bson_iter_init(& doc_iter, doc):
214
216
raise InvalidBSON(" Could not read BSON document" )
217
+ self .count += 1
215
218
self .parse_document(& doc_iter, b" " , 0 )
216
219
finally :
217
220
bson_reader_destroy(stream_reader)
@@ -221,6 +224,7 @@ cdef class BuilderManager:
221
224
cdef dict return_map = {}
222
225
cdef bytes key
223
226
cdef str field
227
+ cdef CStatus status
224
228
cdef _ArrayBuilderBase value
225
229
226
230
# Move the builders to a new dict with string keys.
@@ -230,15 +234,17 @@ cdef class BuilderManager:
230
234
# Insert null fields.
231
235
for field in list (return_map):
232
236
if return_map[field] is None :
233
- return_map[field] = NullBuilder(self .count )
237
+ return_map[field] = NullBuilder()
234
238
235
239
# Pad fields as needed.
236
240
for field, value in return_map.items():
237
241
# If it isn't a list item, append nulls as needed.
238
242
# For lists, the nulls are stored in the parent.
239
243
if not field.endswith(' []' ):
240
244
if value.length() < self .count:
241
- value.append_nulls(self .count - value.length())
245
+ status = value.append_null_raw()
246
+ if not status.ok():
247
+ raise ValueError (" Could not append nulls to" , field)
242
248
243
249
return return_map
244
250
@@ -250,9 +256,11 @@ cdef class _ArrayBuilderBase:
250
256
def append_values (self , values ):
251
257
for value in values:
252
258
if value is None or value is np.nan:
253
- self .append_null()
259
+ status = self .append_null()
254
260
else :
255
- self .append(value)
261
+ status = self .append(value)
262
+ if not status.ok():
263
+ raise ValueError (" Failed to append value" )
256
264
257
265
def append (self , value ):
258
266
""" Interface to append a python value to the builder.
@@ -284,12 +292,13 @@ cdef class _ArrayBuilderBase:
284
292
def __len__ (self ):
285
293
return self .length()
286
294
287
- cpdef void append_null(self ):
288
- self .get_builder().get().AppendNull()
295
+ def append_null (self ):
296
+ cdef CStatus status = self .append_null_raw()
297
+ if not status.ok():
298
+ raise ValueError (" Could not append null" )
289
299
290
- cpdef void append_nulls(self , uint64_t count):
291
- for _ in range (count):
292
- self .append_null()
300
+ cdef CStatus append_null_raw(self ):
301
+ return self .get_builder().get().AppendNull()
293
302
294
303
cpdef uint64_t length(self ):
295
304
return self .get_builder().get().length()
@@ -529,13 +538,10 @@ cdef class Date32Builder(_ArrayBuilderBase):
529
538
cdef class NullBuilder(_ArrayBuilderBase):
530
539
cdef shared_ptr[CArrayBuilder] builder
531
540
532
- def __cinit__ (self , uint64_t count , MemoryPool memory_pool = None ):
541
+ def __cinit__ (self , MemoryPool memory_pool = None ):
533
542
cdef CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
534
- cdef uint64_t i
535
543
self .builder.reset(new CNullBuilder(pool))
536
544
self .type_marker = ARROW_TYPE_NULL
537
- for i in range (count):
538
- self .append_null()
539
545
540
546
cdef CStatus append_raw(self , bson_iter_t * doc_iter, bson_type_t value_t):
541
547
return self .builder.get().AppendNull()
@@ -601,9 +607,10 @@ cdef class BinaryBuilder(_ArrayBuilderBase):
601
607
uint8_t _subtype
602
608
shared_ptr[CBinaryBuilder] builder
603
609
604
- def __cinit__ (self , uint8_t subtype ):
610
+ def __cinit__ (self , uint8_t subtype , MemoryPool memory_pool = None ):
605
611
self ._subtype = subtype
606
- self .builder.reset(new CBinaryBuilder())
612
+ cdef CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
613
+ self .builder.reset(new CBinaryBuilder(pool))
607
614
self .type_marker = BSON_TYPE_BINARY
608
615
609
616
@property
@@ -646,8 +653,9 @@ cdef class DocumentBuilder(_ArrayBuilderBase):
646
653
cpdef uint64_t length(self ):
647
654
return self .count
648
655
649
- cpdef void append_null (self ):
656
+ cdef CStatus append_null_raw (self ):
650
657
self .count += 1
658
+ return CStatus_OK()
651
659
652
660
cpdef void add_field(self , cstring field_name):
653
661
self .field_map[field_name] = 1
@@ -675,8 +683,8 @@ cdef class ListBuilder(_ArrayBuilderBase):
675
683
cpdef void append_count(self ):
676
684
self .count += 1
677
685
678
- cpdef void append_null (self ):
679
- self .builder.get().Append(self .count)
686
+ cdef CStatus append_null_raw (self ):
687
+ return self .builder.get().Append(self .count)
680
688
681
689
cdef shared_ptr[CArrayBuilder] get_builder(self ):
682
690
return < shared_ptr[CArrayBuilder]> self .builder
0 commit comments