Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions dwio/nimble/velox/RawSizeContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include "dwio/nimble/common/Exceptions.h"
#include "dwio/nimble/velox/DecodedVectorManager.h"

namespace facebook::nimble {
Expand All @@ -28,8 +29,65 @@ class RawSizeContext {
return decodedVectorManager_;
}

void appendSize(uint64_t size) {
columnSizes_.push_back(size);
}

uint64_t sizeAt(uint64_t columnIndex) const {
NIMBLE_ASSERT(
columnIndex < columnSizes_.size(),
fmt::format(
"Column index {} is out of range. Total number of columns is {}",
columnIndex,
columnSizes_.size()));
return columnSizes_.at(columnIndex);
}

void setSizeAt(uint64_t columnIndex, uint64_t size) {
NIMBLE_ASSERT(
columnIndex < columnSizes_.size(),
fmt::format(
"Column index {} is out of range. Total number of columns is {}",
columnIndex,
columnSizes_.size()));
columnSizes_[columnIndex] = size;
}

uint64_t columnCount() const {
return columnSizes_.size();
}

void appendNullCount(uint64_t nulls) {
columnNullCounts_.push_back(nulls);
}

uint64_t nullsAt(uint64_t columnIndex) const {
NIMBLE_ASSERT(
columnIndex < columnNullCounts_.size(),
fmt::format(
"Column index {} is out of range. Total number of columns is {}",
columnIndex,
columnNullCounts_.size()));
return columnNullCounts_.at(columnIndex);
}

void setNullsAt(uint64_t columnIndex, uint64_t nulls) {
NIMBLE_ASSERT(
columnIndex < columnNullCounts_.size(),
fmt::format(
"Column index {} is out of range. Total number of columns is {}",
columnIndex,
columnNullCounts_.size()));
columnNullCounts_[columnIndex] = nulls;
}

// Number of nulls in last visited node
uint64_t nullCount{0};

private:
DecodedVectorManager decodedVectorManager_;
std::vector<uint64_t> columnSizes_;
std::vector<uint64_t> columnNullCounts_;
};

} // namespace facebook::nimble
50 changes: 43 additions & 7 deletions dwio/nimble/velox/RawSizeUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ uint64_t getRawSizeFromFixedWidthVector(
}
}

context.nullCount = nullCount;
return ((ranges.size() - nullCount) * sizeof(T)) +
(nullCount * NULL_SIZE);
}
Expand All @@ -67,6 +68,7 @@ uint64_t getRawSizeFromFixedWidthVector(
encoding,
vector->typeKind());

context.nullCount = constVector->mayHaveNulls() ? ranges.size() : 0;
return constVector->mayHaveNulls() ? ranges.size() * NULL_SIZE
: ranges.size() * sizeof(T);
}
Expand All @@ -92,6 +94,7 @@ uint64_t getRawSizeFromFixedWidthVector(
}
}

context.nullCount = nullCount;
return ((ranges.size() - nullCount) * sizeof(T)) +
(nullCount * NULL_SIZE);
}
Expand Down Expand Up @@ -132,6 +135,7 @@ uint64_t getRawSizeFromStringVector(
}
}

context.nullCount = nullCount;
return rawSize + (nullCount * NULL_SIZE);
}
case velox::VectorEncoding::Simple::CONSTANT: {
Expand All @@ -143,6 +147,7 @@ uint64_t getRawSizeFromStringVector(
encoding,
vector->typeKind());

context.nullCount = constVector->mayHaveNulls() ? ranges.size() : 0;
return constVector->mayHaveNulls()
? ranges.size() * NULL_SIZE
: ranges.size() * constVector->value().size();
Expand Down Expand Up @@ -179,6 +184,7 @@ uint64_t getRawSizeFromStringVector(
}
}

context.nullCount = nullCount;
return rawSize + (nullCount * NULL_SIZE);
}
default: {
Expand All @@ -190,7 +196,8 @@ uint64_t getRawSizeFromStringVector(
uint64_t getRawSizeFromConstantComplexVector(
const velox::VectorPtr& vector,
const velox::common::Ranges& ranges,
RawSizeContext& context) {
RawSizeContext& context,
bool topLevelRow = false) {
VELOX_CHECK_NOT_NULL(vector);
VELOX_DCHECK(
velox::VectorEncoding::Simple::CONSTANT == vector->encoding(),
Expand All @@ -199,7 +206,7 @@ uint64_t getRawSizeFromConstantComplexVector(
const auto* constantVector =
vector->as<velox::ConstantVector<velox::ComplexType>>();
VELOX_CHECK_NOT_NULL(
vector,
constantVector,
"Encoding mismatch on ConstantVector. Encoding: {}. TypeKind: {}.",
vector->encoding(),
vector->typeKind());
Expand All @@ -209,8 +216,22 @@ uint64_t getRawSizeFromConstantComplexVector(
velox::common::Ranges childRanges;
childRanges.add(index, index + 1);

uint64_t rawSize = getRawSizeFromVector(valueVector, childRanges, context);

uint64_t rawSize = 0;
if (topLevelRow) {
VELOX_CHECK_EQ(
velox::TypeKind::ROW,
valueVector->typeKind(),
"Value vector should be a RowVector");
rawSize = getRawSizeFromRowVector(
valueVector, childRanges, context, /*topLevel=*/true);
for (int idx = 0; idx < context.columnCount(); ++idx) {
context.setSizeAt(idx, context.sizeAt(idx) * ranges.size());
context.setNullsAt(idx, context.nullsAt(idx) * ranges.size());
}
} else {
rawSize = getRawSizeFromVector(valueVector, childRanges, context);
}
context.nullCount = constantVector->mayHaveNulls() ? ranges.size() : 0;
return rawSize * ranges.size();
}

Expand Down Expand Up @@ -319,6 +340,7 @@ uint64_t getRawSizeFromArrayVector(
getRawSizeFromVector(arrayVector->elements(), childRanges, context);
}

context.nullCount = nullCount;
if (nullCount) {
rawSize += nullCount * NULL_SIZE;
}
Expand Down Expand Up @@ -432,6 +454,7 @@ uint64_t getRawSizeFromMapVector(
getRawSizeFromVector(mapVector->mapValues(), childRanges, context);
}

context.nullCount = nullCount;
if (nullCount) {
rawSize += nullCount * NULL_SIZE;
}
Expand All @@ -442,7 +465,8 @@ uint64_t getRawSizeFromMapVector(
uint64_t getRawSizeFromRowVector(
const velox::VectorPtr& vector,
const velox::common::Ranges& ranges,
RawSizeContext& context) {
RawSizeContext& context,
const bool topLevel) {
VELOX_CHECK_NOT_NULL(vector);
const auto& encoding = vector->encoding();
const velox::RowVector* rowVector;
Expand Down Expand Up @@ -477,7 +501,8 @@ uint64_t getRawSizeFromRowVector(
break;
}
case velox::VectorEncoding::Simple::CONSTANT: {
return getRawSizeFromConstantComplexVector(vector, ranges, context);
return getRawSizeFromConstantComplexVector(
vector, ranges, context, topLevel);
}
case velox::VectorEncoding::Simple::DICTIONARY: {
const auto* dictionaryRowVector =
Expand Down Expand Up @@ -528,11 +553,22 @@ uint64_t getRawSizeFromRowVector(
if ((*childRangesPtr).size()) {
const auto childrenSize = rowVector->childrenSize();
for (size_t i = 0; i < childrenSize; ++i) {
rawSize +=
auto childRawSize =
getRawSizeFromVector(rowVector->childAt(i), *childRangesPtr, context);
rawSize += childRawSize;
if (topLevel) {
context.appendSize(childRawSize);
context.appendNullCount(context.nullCount);
}
}
} else if (topLevel) {
for (size_t i = 0; i < rowVector->childrenSize(); ++i) {
context.appendSize(0);
context.appendNullCount(0);
}
}

context.nullCount = nullCount;
if (nullCount) {
rawSize += nullCount * NULL_SIZE;
}
Expand Down
5 changes: 5 additions & 0 deletions dwio/nimble/velox/RawSizeUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,9 @@ uint64_t getRawSizeFromVector(
const velox::VectorPtr& vector,
const velox::common::Ranges& ranges);

uint64_t getRawSizeFromRowVector(
const velox::VectorPtr& vector,
const velox::common::Ranges& ranges,
RawSizeContext& context,
const bool topLevel = false);
} // namespace facebook::nimble
2 changes: 1 addition & 1 deletion dwio/nimble/velox/StreamLabels.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void addLabels(
const auto offset = row.nullsDescriptor().offset();
NIMBLE_DASSERT(labelIndex < labels.size(), "Unexpected label index.");
NIMBLE_DASSERT(offsetToLabel.size() > offset, "Unexpected offset.");
labels.push_back(labels[labelIndex] + "/");
labels.push_back(labels[labelIndex] + name + "/");
labelIndex = labels.size() - 1;
offsetToLabel[offset] = labelIndex;
for (auto i = 0; i < row.childrenCount(); ++i) {
Expand Down
82 changes: 68 additions & 14 deletions dwio/nimble/velox/VeloxWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "dwio/nimble/velox/SchemaSerialization.h"
#include "dwio/nimble/velox/SchemaTypes.h"
#include "dwio/nimble/velox/StatsGenerated.h"
#include "dwio/nimble/velox/StreamLabels.h"
#include "folly/ScopeGuard.h"
#include "velox/common/time/CpuWallTimer.h"
#include "velox/dwio/common/ExecutorBarrier.h"
Expand All @@ -47,6 +48,11 @@ namespace detail {

class WriterContext : public FieldWriterContext {
public:
struct ColumnStats {
uint64_t logicalSize{0};
uint64_t nullCount{0};
};

const VeloxWriterOptions options;
std::unique_ptr<FlushPolicy> flushPolicy;
velox::CpuWallTiming totalFlushTiming;
Expand All @@ -63,6 +69,8 @@ class WriterContext : public FieldWriterContext {
uint64_t stripeSize{0};
uint64_t rawSize{0};
std::vector<uint64_t> rowsPerStripe;
std::unordered_map<offset_size, std::atomic<uint64_t>> streamPhysicalSize;
std::vector<ColumnStats> columnStats;

WriterContext(
velox::memory::MemoryPool& memoryPool,
Expand Down Expand Up @@ -516,10 +524,20 @@ bool VeloxWriter::write(const velox::VectorPtr& vector) {
auto size = vector->size();

// Calculate raw size.
auto rawSize = nimble::getRawSizeFromVector(
vector, velox::common::Ranges::of(0, size));
RawSizeContext context;
auto rawSize = nimble::getRawSizeFromRowVector(
vector, velox::common::Ranges::of(0, size), context, /*topLevel=*/true);
DWIO_ENSURE_GE(rawSize, 0, "Invalid raw size");
context_->rawSize += rawSize;
auto columnCount = context.columnCount();
if (context_->columnStats.empty()) {
context_->columnStats =
std::vector<detail::WriterContext::ColumnStats>(columnCount);
}
for (auto i = 0; i < columnCount; ++i) {
context_->columnStats[i].logicalSize += context.sizeAt(i);
context_->columnStats[i].nullCount += context.nullsAt(i);
}

if (context_->options.writeExecutor) {
velox::dwio::common::ExecutorBarrier barrier{
Expand Down Expand Up @@ -580,6 +598,34 @@ void VeloxWriter::close() {
builder.GetSize()});
}

{
// Accumulate column physical size.
std::vector<uint64_t> columnPhysicalSize(
context_->columnStats.size(), 0);
nimble::StreamLabels streamLabels{nimble::SchemaReader::getSchema(
context_->schemaBuilder.getSchemaNodes())};
for (const auto& [offset, streamSize] : context_->streamPhysicalSize) {
if (offset == 0) {
continue;
}
std::vector<std::string> streamLabel;
folly::split(
'/',
streamLabels.streamLabel(offset),
streamLabel,
/*ignoreEmpty=*/true);
NIMBLE_DASSERT(!streamLabel.empty(), "Invalid stream label");
auto column = std::stoi(streamLabel[0]);
NIMBLE_DASSERT(
column < columnPhysicalSize.size(),
fmt::format(
"Index {} is out of range for physical size vector of size {}",
column,
columnPhysicalSize.size()));
columnPhysicalSize[column] += streamSize;
}
}

{
flatbuffers::FlatBufferBuilder builder;
builder.Finish(serialization::CreateStats(builder, context_->rawSize));
Expand Down Expand Up @@ -691,14 +737,16 @@ void VeloxWriter::writeChunk(bool lastChunk) {
StreamData& streamData_;
};

auto encode = [&](StreamData& streamData) {
auto encode = [&](StreamData& streamData,
std::atomic<uint64_t>& streamSize) {
const auto offset = streamData.descriptor().offset();
auto encoded = encodeStream(*context_, *encodingBuffer_, streamData);
if (!encoded.empty()) {
ChunkedStreamWriter chunkWriter{*encodingBuffer_};
NIMBLE_DASSERT(offset < streams_.size(), "Stream offset out of range.");
auto& stream = streams_[offset];
for (auto& buffer : chunkWriter.encode(encoded)) {
streamSize += buffer.size();
chunkSize += buffer.size();
stream.content.push_back(std::move(buffer));
}
Expand Down Expand Up @@ -739,29 +787,35 @@ void VeloxWriter::writeChunk(bool lastChunk) {
velox::dwio::common::ExecutorBarrier barrier{
context_->options.encodingExecutor};
for (auto& streamData : context_->streams()) {
auto& streamSize =
context_->streamPhysicalSize[streamData->descriptor().offset()];
processStream(
*streamData, [&](StreamData& innerStreamData, bool isNullStream) {
barrier.add([&innerStreamData, isNullStream, &encode]() {
if (isNullStream) {
NullsAsDataStreamData nullsStreamData{innerStreamData};
encode(nullsStreamData);
} else {
encode(innerStreamData);
}
});
barrier.add(
[&innerStreamData, isNullStream, &encode, &streamSize]() {
if (isNullStream) {
NullsAsDataStreamData nullsStreamData{innerStreamData};
encode(nullsStreamData, streamSize);
} else {
encode(innerStreamData, streamSize);
}
});
});
}
barrier.waitAll();
} else {
for (auto& streamData : context_->streams()) {
auto& streamSize =
context_->streamPhysicalSize[streamData->descriptor().offset()];
processStream(
*streamData,
[&encode](StreamData& innerStreamData, bool isNullStream) {
[&encode, &streamSize](
StreamData& innerStreamData, bool isNullStream) {
if (isNullStream) {
NullsAsDataStreamData nullsStreamData{innerStreamData};
encode(nullsStreamData);
encode(nullsStreamData, streamSize);
} else {
encode(innerStreamData);
encode(innerStreamData, streamSize);
}
});
}
Expand Down
Loading
Loading