Skip to content

Commit 55b0436

Browse files
dongxiao1198xiao.dongFokkozhjwpku
authored
feat: support Avro writer (#173)
1 add avro writer and factory(without write func since converter pull/166 not finished yet) 2 add file metrics definition --------- Co-authored-by: xiao.dong <[email protected]> Co-authored-by: Fokko Driesprong <[email protected]> Co-authored-by: Junwang Zhao <[email protected]>
1 parent 25d4d62 commit 55b0436

File tree

7 files changed

+322
-8
lines changed

7 files changed

+322
-8
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ set(ICEBERG_SOURCES
2323
expression/expression.cc
2424
expression/literal.cc
2525
file_reader.cc
26+
file_writer.cc
2627
json_internal.cc
2728
manifest_entry.cc
2829
manifest_list.cc
@@ -107,6 +108,7 @@ if(ICEBERG_BUILD_BUNDLE)
107108
arrow/arrow_fs_file_io.cc
108109
avro/avro_data_util.cc
109110
avro/avro_reader.cc
111+
avro/avro_writer.cc
110112
avro/avro_schema_util.cc
111113
avro/avro_register.cc
112114
avro/avro_stream_internal.cc

src/iceberg/avro/avro_reader.cc

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include <avro/Generic.hh>
3232
#include <avro/GenericDatum.hh>
3333

34+
#include "iceberg/arrow/arrow_error_transform_internal.h"
3435
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
3536
#include "iceberg/avro/avro_data_util_internal.h"
3637
#include "iceberg/avro/avro_schema_util_internal.h"
@@ -51,13 +52,8 @@ Result<std::unique_ptr<AvroInputStream>> CreateInputStream(const ReaderOptions&
5152
}
5253

5354
auto io = internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
54-
auto result = io->fs()->OpenInputFile(file_info);
55-
if (!result.ok()) {
56-
return IOError("Failed to open file {} for {}", options.path,
57-
result.status().message());
58-
}
59-
60-
return std::make_unique<AvroInputStream>(result.MoveValueUnsafe(), buffer_size);
55+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, io->fs()->OpenInputFile(file_info));
56+
return std::make_unique<AvroInputStream>(file, buffer_size);
6157
}
6258

6359
} // namespace

src/iceberg/avro/avro_writer.cc

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/avro/avro_writer.h"
21+
22+
#include <memory>
23+
24+
#include <arrow/array/builder_base.h>
25+
#include <arrow/c/bridge.h>
26+
#include <arrow/record_batch.h>
27+
#include <arrow/result.h>
28+
#include <avro/DataFile.hh>
29+
#include <avro/GenericDatum.hh>
30+
31+
#include "iceberg/arrow/arrow_error_transform_internal.h"
32+
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
33+
#include "iceberg/avro/avro_schema_util_internal.h"
34+
#include "iceberg/avro/avro_stream_internal.h"
35+
#include "iceberg/schema.h"
36+
#include "iceberg/util/checked_cast.h"
37+
#include "iceberg/util/macros.h"
38+
39+
namespace iceberg::avro {
40+
41+
namespace {
42+
43+
Result<std::unique_ptr<AvroOutputStream>> CreateOutputStream(const WriterOptions& options,
44+
int64_t buffer_size) {
45+
auto io = internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
46+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output, io->fs()->OpenOutputStream(options.path));
47+
return std::make_unique<AvroOutputStream>(output, buffer_size);
48+
}
49+
50+
} // namespace
51+
52+
// A stateful context to keep track of the writing progress.
53+
struct WriteContext {};
54+
55+
class AvroWriter::Impl {
56+
public:
57+
Status Open(const WriterOptions& options) {
58+
write_schema_ = options.schema;
59+
60+
::avro::NodePtr root;
61+
ICEBERG_RETURN_UNEXPECTED(ToAvroNodeVisitor{}.Visit(*write_schema_, &root));
62+
63+
avro_schema_ = std::make_shared<::avro::ValidSchema>(root);
64+
65+
// Open the output stream and adapt to the avro interface.
66+
constexpr int64_t kDefaultBufferSize = 1024 * 1024;
67+
ICEBERG_ASSIGN_OR_RAISE(auto output_stream,
68+
CreateOutputStream(options, kDefaultBufferSize));
69+
70+
writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>(
71+
std::move(output_stream), *avro_schema_);
72+
return {};
73+
}
74+
75+
Status Write(ArrowArray /*data*/) {
76+
// TODO(xiao.dong) convert data and write to avro
77+
// total_bytes_+= written_bytes;
78+
return {};
79+
}
80+
81+
Status Close() {
82+
if (writer_ != nullptr) {
83+
writer_->close();
84+
writer_.reset();
85+
}
86+
return {};
87+
}
88+
89+
bool Closed() const { return writer_ == nullptr; }
90+
91+
int64_t length() { return total_bytes_; }
92+
93+
private:
94+
int64_t total_bytes_ = 0;
95+
// The schema to write.
96+
std::shared_ptr<::iceberg::Schema> write_schema_;
97+
// The avro schema to write.
98+
std::shared_ptr<::avro::ValidSchema> avro_schema_;
99+
// The avro writer to write the data into a datum.
100+
std::unique_ptr<::avro::DataFileWriter<::avro::GenericDatum>> writer_;
101+
};
102+
103+
AvroWriter::~AvroWriter() = default;
104+
105+
Status AvroWriter::Write(ArrowArray data) { return impl_->Write(data); }
106+
107+
Status AvroWriter::Open(const WriterOptions& options) {
108+
impl_ = std::make_unique<Impl>();
109+
return impl_->Open(options);
110+
}
111+
112+
Status AvroWriter::Close() {
113+
if (!impl_->Closed()) {
114+
return impl_->Close();
115+
}
116+
return {};
117+
}
118+
119+
std::optional<Metrics> AvroWriter::metrics() {
120+
if (impl_->Closed()) {
121+
// TODO(xiao.dong) implement metrics
122+
return {};
123+
}
124+
return std::nullopt;
125+
}
126+
127+
std::optional<int64_t> AvroWriter::length() {
128+
if (impl_->Closed()) {
129+
return impl_->length();
130+
}
131+
return std::nullopt;
132+
}
133+
134+
std::vector<int64_t> AvroWriter::split_offsets() { return {}; }
135+
136+
void AvroWriter::Register() {
137+
static WriterFactoryRegistry avro_writer_register(
138+
FileFormatType::kAvro,
139+
[]() -> Result<std::unique_ptr<Writer>> { return std::make_unique<AvroWriter>(); });
140+
}
141+
142+
} // namespace iceberg::avro

src/iceberg/avro/avro_writer.h

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include "iceberg/file_writer.h"
23+
#include "iceberg/iceberg_bundle_export.h"
24+
25+
namespace iceberg::avro {
26+
27+
/// \brief A writer for serializing ArrowArray to Avro files.
28+
class ICEBERG_BUNDLE_EXPORT AvroWriter : public Writer {
29+
public:
30+
AvroWriter() = default;
31+
32+
~AvroWriter() override;
33+
34+
Status Open(const WriterOptions& options) final;
35+
36+
Status Close() final;
37+
38+
Status Write(ArrowArray data) final;
39+
40+
std::optional<Metrics> metrics() final;
41+
42+
std::optional<int64_t> length() final;
43+
44+
std::vector<int64_t> split_offsets() final;
45+
46+
/// \brief Register this Avro writer implementation.
47+
static void Register();
48+
49+
private:
50+
class Impl;
51+
std::unique_ptr<Impl> impl_;
52+
};
53+
54+
} // namespace iceberg::avro

src/iceberg/file_writer.cc

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/file_writer.h"
21+
22+
#include <unordered_map>
23+
24+
#include "iceberg/result.h"
25+
#include "iceberg/util/formatter.h"
26+
#include "iceberg/util/macros.h"
27+
28+
namespace iceberg {
29+
30+
namespace {
31+
32+
WriterFactory GetNotImplementedFactory(FileFormatType format_type) {
33+
return [format_type]() -> Result<std::unique_ptr<Writer>> {
34+
return NotImplemented("Missing writer factory for file format: {}", format_type);
35+
};
36+
}
37+
38+
} // namespace
39+
40+
WriterFactory& WriterFactoryRegistry::GetFactory(FileFormatType format_type) {
41+
static std::unordered_map<FileFormatType, WriterFactory> factories = {
42+
{FileFormatType::kAvro, GetNotImplementedFactory(FileFormatType::kAvro)},
43+
{FileFormatType::kParquet, GetNotImplementedFactory(FileFormatType::kParquet)},
44+
{FileFormatType::kOrc, GetNotImplementedFactory(FileFormatType::kOrc)},
45+
{FileFormatType::kPuffin, GetNotImplementedFactory(FileFormatType::kPuffin)},
46+
};
47+
return factories.at(format_type);
48+
}
49+
50+
WriterFactoryRegistry::WriterFactoryRegistry(FileFormatType format_type,
51+
WriterFactory factory) {
52+
GetFactory(format_type) = std::move(factory);
53+
}
54+
55+
Result<std::unique_ptr<Writer>> WriterFactoryRegistry::Open(
56+
FileFormatType format_type, const WriterOptions& options) {
57+
ICEBERG_ASSIGN_OR_RAISE(auto writer, GetFactory(format_type)());
58+
ICEBERG_RETURN_UNEXPECTED(writer->Open(options));
59+
return writer;
60+
}
61+
62+
} // namespace iceberg

src/iceberg/file_writer.h

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
#include "iceberg/arrow_c_data.h"
3030
#include "iceberg/file_format.h"
31+
#include "iceberg/metrics.h"
3132
#include "iceberg/result.h"
3233
#include "iceberg/type_fwd.h"
3334

@@ -38,7 +39,7 @@ struct ICEBERG_EXPORT WriterOptions {
3839
/// \brief The path to the file to write.
3940
std::string path;
4041
/// \brief The schema of the data to write.
41-
ArrowSchema schema;
42+
std::shared_ptr<Schema> schema;
4243
/// \brief FileIO instance to open the file. Writer implementations should down cast it
4344
/// to the specific FileIO implementation. By default, the `iceberg-bundle` library uses
4445
/// `ArrowFileSystemFileIO` as the default implementation.
@@ -65,6 +66,20 @@ class ICEBERG_EXPORT Writer {
6566
///
6667
/// \return Status of write results.
6768
virtual Status Write(ArrowArray data) = 0;
69+
70+
/// \brief Get the file statistics.
71+
/// Only valid after the file is closed.
72+
virtual std::optional<Metrics> metrics() = 0;
73+
74+
/// \brief Get the file length.
75+
/// Only valid after the file is closed.
76+
virtual std::optional<int64_t> length() = 0;
77+
78+
/// \brief Returns a list of recommended split locations, if applicable, empty
79+
/// otherwise. When available, this information is used for planning scan tasks whose
80+
/// boundaries are determined by these offsets. The returned list must be sorted in
81+
/// ascending order. Only valid after the file is closed.
82+
virtual std::vector<int64_t> split_offsets() = 0;
6883
};
6984

7085
/// \brief Factory function to create a writer of a specific file format.

src/iceberg/metrics.h

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/metrics.h
23+
/// Iceberg file format metrics
24+
25+
#include <unordered_map>
26+
27+
#include "iceberg/expression/literal.h"
28+
#include "iceberg/iceberg_export.h"
29+
30+
namespace iceberg {
31+
32+
/// \brief Iceberg file format metrics
33+
struct ICEBERG_EXPORT Metrics {
34+
int64_t row_count = 0;
35+
std::unordered_map<int64_t, int64_t> column_sizes;
36+
std::unordered_map<int64_t, int64_t> value_counts;
37+
std::unordered_map<int64_t, int64_t> null_value_counts;
38+
std::unordered_map<int64_t, int64_t> nan_value_counts;
39+
std::unordered_map<int64_t, Literal> lower_bounds;
40+
std::unordered_map<int64_t, Literal> upper_bounds;
41+
};
42+
43+
} // namespace iceberg

0 commit comments

Comments
 (0)