Skip to content

Commit d40c29f

Browse files
committed
feat: add row-based immutable data structure
- Add StructLike, MapLike, and ArrayLike interfaces - Add wrapper for ManifestFile and ArrowArray
1 parent f2d0abd commit d40c29f

12 files changed

+1309
-29
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ set(ICEBERG_SOURCES
3131
name_mapping.cc
3232
partition_field.cc
3333
partition_spec.cc
34+
row/arrow_array_wrapper.cc
35+
row/manifest_wrapper.cc
3436
schema.cc
3537
schema_field.cc
3638
schema_internal.cc
@@ -98,6 +100,7 @@ iceberg_install_all_headers(iceberg)
98100

99101
add_subdirectory(catalog)
100102
add_subdirectory(expression)
103+
add_subdirectory(row)
101104
add_subdirectory(util)
102105

103106
install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_export.h

src/iceberg/manifest_reader_internal.cc

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
#include "manifest_reader_internal.h"
2121

22-
#include <array>
23-
2422
#include <nanoarrow/nanoarrow.h>
2523

2624
#include "iceberg/arrow_c_data_guard_internal.h"
@@ -29,6 +27,7 @@
2927
#include "iceberg/manifest_list.h"
3028
#include "iceberg/schema.h"
3129
#include "iceberg/type.h"
30+
#include "iceberg/util/checked_cast.h"
3231
#include "iceberg/util/macros.h"
3332

3433
namespace iceberg {
@@ -39,7 +38,7 @@ namespace iceberg {
3938
}
4039

4140
#define PARSE_PRIMITIVE_FIELD(item, array_view, type) \
42-
for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) { \
41+
for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) { \
4342
if (!ArrowArrayViewIsNull(array_view, row_idx)) { \
4443
auto value = ArrowArrayViewGetIntUnsafe(array_view, row_idx); \
4544
item = static_cast<type>(value); \
@@ -50,7 +49,7 @@ namespace iceberg {
5049
}
5150

5251
#define PARSE_STRING_FIELD(item, array_view) \
53-
for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) { \
52+
for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) { \
5453
if (!ArrowArrayViewIsNull(array_view, row_idx)) { \
5554
auto value = ArrowArrayViewGetStringUnsafe(array_view, row_idx); \
5655
item = std::string(value.data, value.size_bytes); \
@@ -61,7 +60,7 @@ namespace iceberg {
6160
}
6261

6362
#define PARSE_BINARY_FIELD(item, array_view) \
64-
for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) { \
63+
for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) { \
6564
if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { \
6665
item = ArrowArrayViewGetInt8Vector(array_view, row_idx); \
6766
} else if (required) { \
@@ -227,66 +226,67 @@ Result<std::vector<ManifestFile>> ParseManifestList(ArrowSchema* schema,
227226
auto field_name = field.value().get().name();
228227
bool required = !field.value().get().optional();
229228
auto view_of_column = array_view.children[idx];
230-
switch (idx) {
231-
case 0:
229+
ICEBERG_ASSIGN_OR_RAISE(auto manifest_file_field, ManifestFileFieldFromIndex(idx));
230+
switch (manifest_file_field) {
231+
case ManifestFileField::kManifestPath:
232232
PARSE_STRING_FIELD(manifest_files[row_idx].manifest_path, view_of_column);
233233
break;
234-
case 1:
234+
case ManifestFileField::kManifestLength:
235235
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].manifest_length, view_of_column,
236236
int64_t);
237237
break;
238-
case 2:
238+
case ManifestFileField::kPartitionSpecId:
239239
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].partition_spec_id, view_of_column,
240240
int32_t);
241241
break;
242-
case 3:
242+
case ManifestFileField::kContent:
243243
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].content, view_of_column,
244244
ManifestFile::Content);
245245
break;
246-
case 4:
246+
case ManifestFileField::kSequenceNumber:
247247
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].sequence_number, view_of_column,
248248
int64_t);
249249
break;
250-
case 5:
250+
case ManifestFileField::kMinSequenceNumber:
251251
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].min_sequence_number, view_of_column,
252252
int64_t);
253253
break;
254-
case 6:
254+
case ManifestFileField::kAddedSnapshotId:
255255
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_snapshot_id, view_of_column,
256256
int64_t);
257257
break;
258-
case 7:
258+
case ManifestFileField::kAddedFilesCount:
259259
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_files_count, view_of_column,
260260
int32_t);
261261
break;
262-
case 8:
262+
case ManifestFileField::kExistingFilesCount:
263263
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].existing_files_count,
264264
view_of_column, int32_t);
265265
break;
266-
case 9:
266+
case ManifestFileField::kDeletedFilesCount:
267267
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].deleted_files_count, view_of_column,
268268
int32_t);
269269
break;
270-
case 10:
270+
case ManifestFileField::kAddedRowsCount:
271271
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_rows_count, view_of_column,
272272
int64_t);
273273
break;
274-
case 11:
274+
case ManifestFileField::kExistingRowsCount:
275275
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].existing_rows_count, view_of_column,
276276
int64_t);
277277
break;
278-
case 12:
278+
case ManifestFileField::kDeletedRowsCount:
279279
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].deleted_rows_count, view_of_column,
280280
int64_t);
281281
break;
282-
case 13:
282+
case ManifestFileField::kPartitionFieldSummary:
283283
ICEBERG_RETURN_UNEXPECTED(
284284
ParsePartitionFieldSummaryList(view_of_column, manifest_files));
285285
break;
286-
case 14:
286+
case ManifestFileField::kKeyMetadata:
287287
PARSE_BINARY_FIELD(manifest_files[row_idx].key_metadata, view_of_column);
288288
break;
289-
case 15:
289+
case ManifestFileField::kFirstRowId:
290290
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].first_row_id, view_of_column,
291291
int64_t);
292292
break;
@@ -297,7 +297,7 @@ Result<std::vector<ManifestFile>> ParseManifestList(ArrowSchema* schema,
297297
return manifest_files;
298298
}
299299

300-
Status ParseLiteral(ArrowArrayView* view_of_partition, size_t row_idx,
300+
Status ParseLiteral(ArrowArrayView* view_of_partition, int64_t row_idx,
301301
std::vector<ManifestEntry>& manifest_entries) {
302302
if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_BOOL) {
303303
auto value = ArrowArrayViewGetUIntUnsafe(view_of_partition, row_idx);
@@ -357,7 +357,7 @@ Status ParseDataFile(const std::shared_ptr<StructType>& data_file_schema,
357357
view_of_file_field);
358358
break;
359359
case 2:
360-
for (size_t row_idx = 0; row_idx < view_of_file_field->length; row_idx++) {
360+
for (int64_t row_idx = 0; row_idx < view_of_file_field->length; row_idx++) {
361361
if (!ArrowArrayViewIsNull(view_of_file_field, row_idx)) {
362362
auto value = ArrowArrayViewGetStringUnsafe(view_of_file_field, row_idx);
363363
std::string_view path_str(value.data, value.size_bytes);
@@ -512,7 +512,7 @@ Result<std::vector<ManifestEntry>> ParseManifestEntry(ArrowSchema* schema,
512512
break;
513513
case 4: {
514514
auto data_file_schema =
515-
dynamic_pointer_cast<StructType>(field.value().get().type());
515+
internal::checked_pointer_cast<StructType>(field.value().get().type());
516516
ICEBERG_RETURN_UNEXPECTED(
517517
ParseDataFile(data_file_schema, view_of_column, manifest_entries));
518518
break;
@@ -567,4 +567,11 @@ Result<std::vector<ManifestFile>> ManifestListReaderImpl::Files() const {
567567
return manifest_files;
568568
}
569569

570+
Result<ManifestFileField> ManifestFileFieldFromIndex(int32_t index) {
571+
if (index >= 0 && index < static_cast<int32_t>(ManifestFileField::kNextId)) {
572+
return static_cast<ManifestFileField>(index);
573+
}
574+
return InvalidArgument("Invalid manifest file field index: {}", index);
575+
}
576+
570577
} // namespace iceberg

src/iceberg/manifest_reader_internal.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,26 @@ class ManifestListReaderImpl : public ManifestListReader {
5454
std::unique_ptr<Reader> reader_;
5555
};
5656

57+
enum class ManifestFileField : int32_t {
58+
kManifestPath = 0,
59+
kManifestLength,
60+
kPartitionSpecId = 2,
61+
kContent = 3,
62+
kSequenceNumber = 4,
63+
kMinSequenceNumber = 5,
64+
kAddedSnapshotId = 6,
65+
kAddedFilesCount = 7,
66+
kExistingFilesCount = 8,
67+
kDeletedFilesCount = 9,
68+
kAddedRowsCount = 10,
69+
kExistingRowsCount = 11,
70+
kDeletedRowsCount = 12,
71+
kPartitionFieldSummary = 13,
72+
kKeyMetadata = 14,
73+
kFirstRowId = 15,
74+
kNextId = 16,
75+
};
76+
77+
Result<ManifestFileField> ManifestFileFieldFromIndex(int32_t index);
78+
5779
} // namespace iceberg

src/iceberg/row/CMakeLists.txt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
iceberg_install_all_headers(iceberg/row)

0 commit comments

Comments
 (0)