Skip to content

Commit b55fc51

Browse files
macvincentfacebook-github-bot
authored andcommitted
Write Column Logical Size Into Nimble Files (facebookincubator#179)
Summary: Pull Request resolved: facebookincubator#179 Differential Revision: D75973860
1 parent 1c50478 commit b55fc51

File tree

1 file changed

+62
-14
lines changed

1 file changed

+62
-14
lines changed

dwio/nimble/velox/VeloxWriter.cpp

Lines changed: 62 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#include "dwio/nimble/velox/SchemaSerialization.h"
3737
#include "dwio/nimble/velox/SchemaTypes.h"
3838
#include "dwio/nimble/velox/StatsGenerated.h"
39+
#include "dwio/nimble/velox/StreamLabels.h"
3940
#include "folly/ScopeGuard.h"
4041
#include "velox/common/time/CpuWallTimer.h"
4142
#include "velox/dwio/common/ExecutorBarrier.h"
@@ -47,6 +48,11 @@ namespace detail {
4748

4849
class WriterContext : public FieldWriterContext {
4950
public:
51+
struct ColumnStats {
52+
uint64_t logicalSize{0};
53+
uint64_t nullCount{0};
54+
};
55+
5056
const VeloxWriterOptions options;
5157
std::unique_ptr<FlushPolicy> flushPolicy;
5258
velox::CpuWallTiming totalFlushTiming;
@@ -63,6 +69,8 @@ class WriterContext : public FieldWriterContext {
6369
uint64_t stripeSize{0};
6470
uint64_t rawSize{0};
6571
std::vector<uint64_t> rowsPerStripe;
72+
std::unordered_map<offset_size, std::atomic<uint64_t>> streamPhysicalSize;
73+
std::vector<ColumnStats> columnStats;
6674

6775
WriterContext(
6876
velox::memory::MemoryPool& memoryPool,
@@ -516,10 +524,20 @@ bool VeloxWriter::write(const velox::VectorPtr& vector) {
516524
auto size = vector->size();
517525

518526
// Calculate raw size.
519-
auto rawSize = nimble::getRawSizeFromVector(
520-
vector, velox::common::Ranges::of(0, size));
527+
RawSizeContext context;
528+
auto rawSize = nimble::getRawSizeFromRowVector(
529+
vector, velox::common::Ranges::of(0, size), context, /*topLevel=*/true);
521530
DWIO_ENSURE_GE(rawSize, 0, "Invalid raw size");
522531
context_->rawSize += rawSize;
532+
auto columnCount = context.columnCount();
533+
if (context_->columnStats.empty()) {
534+
context_->columnStats =
535+
std::vector<detail::WriterContext::ColumnStats>(columnCount);
536+
}
537+
for (auto i = 0; i < columnCount; ++i) {
538+
context_->columnStats[i].logicalSize += context.sizeAt(i);
539+
context_->columnStats[i].nullCount += context.nullsAt(i);
540+
}
523541

524542
if (context_->options.writeExecutor) {
525543
velox::dwio::common::ExecutorBarrier barrier{
@@ -580,6 +598,28 @@ void VeloxWriter::close() {
580598
builder.GetSize()});
581599
}
582600

601+
{
602+
// Accumulate column physical size.
603+
std::vector<uint64_t> columnPhysicalSize(
604+
context_->columnStats.size(), 0);
605+
nimble::StreamLabels streamLabels{nimble::SchemaReader::getSchema(
606+
context_->schemaBuilder.getSchemaNodes())};
607+
for (const auto& [offset, streamSize] : context_->streamPhysicalSize) {
608+
if (offset == 0) {
609+
continue;
610+
}
611+
std::vector<std::string> streamLabel;
612+
folly::split(
613+
'/',
614+
streamLabels.streamLabel(offset),
615+
streamLabel,
616+
/*ignoreEmpty=*/true);
617+
NIMBLE_ASSERT(!streamLabel.empty(), "Invalid stream label");
618+
auto column = std::stoi(streamLabel[0]);
619+
columnPhysicalSize[column] += streamSize;
620+
}
621+
}
622+
583623
{
584624
flatbuffers::FlatBufferBuilder builder;
585625
builder.Finish(serialization::CreateStats(builder, context_->rawSize));
@@ -691,14 +731,16 @@ void VeloxWriter::writeChunk(bool lastChunk) {
691731
StreamData& streamData_;
692732
};
693733

694-
auto encode = [&](StreamData& streamData) {
734+
auto encode = [&](StreamData& streamData,
735+
std::atomic<uint64_t>& streamSize) {
695736
const auto offset = streamData.descriptor().offset();
696737
auto encoded = encodeStream(*context_, *encodingBuffer_, streamData);
697738
if (!encoded.empty()) {
698739
ChunkedStreamWriter chunkWriter{*encodingBuffer_};
699740
NIMBLE_DASSERT(offset < streams_.size(), "Stream offset out of range.");
700741
auto& stream = streams_[offset];
701742
for (auto& buffer : chunkWriter.encode(encoded)) {
743+
streamSize += buffer.size();
702744
chunkSize += buffer.size();
703745
stream.content.push_back(std::move(buffer));
704746
}
@@ -739,29 +781,35 @@ void VeloxWriter::writeChunk(bool lastChunk) {
739781
velox::dwio::common::ExecutorBarrier barrier{
740782
context_->options.encodingExecutor};
741783
for (auto& streamData : context_->streams()) {
784+
auto& streamSize =
785+
context_->streamPhysicalSize[streamData->descriptor().offset()];
742786
processStream(
743787
*streamData, [&](StreamData& innerStreamData, bool isNullStream) {
744-
barrier.add([&innerStreamData, isNullStream, &encode]() {
745-
if (isNullStream) {
746-
NullsAsDataStreamData nullsStreamData{innerStreamData};
747-
encode(nullsStreamData);
748-
} else {
749-
encode(innerStreamData);
750-
}
751-
});
788+
barrier.add(
789+
[&innerStreamData, isNullStream, &encode, &streamSize]() {
790+
if (isNullStream) {
791+
NullsAsDataStreamData nullsStreamData{innerStreamData};
792+
encode(nullsStreamData, streamSize);
793+
} else {
794+
encode(innerStreamData, streamSize);
795+
}
796+
});
752797
});
753798
}
754799
barrier.waitAll();
755800
} else {
756801
for (auto& streamData : context_->streams()) {
802+
auto& streamSize =
803+
context_->streamPhysicalSize[streamData->descriptor().offset()];
757804
processStream(
758805
*streamData,
759-
[&encode](StreamData& innerStreamData, bool isNullStream) {
806+
[&encode, &streamSize](
807+
StreamData& innerStreamData, bool isNullStream) {
760808
if (isNullStream) {
761809
NullsAsDataStreamData nullsStreamData{innerStreamData};
762-
encode(nullsStreamData);
810+
encode(nullsStreamData, streamSize);
763811
} else {
764-
encode(innerStreamData);
812+
encode(innerStreamData, streamSize);
765813
}
766814
});
767815
}

0 commit comments

Comments
 (0)