36
36
#include " dwio/nimble/velox/SchemaSerialization.h"
37
37
#include " dwio/nimble/velox/SchemaTypes.h"
38
38
#include " dwio/nimble/velox/StatsGenerated.h"
39
+ #include " dwio/nimble/velox/StreamLabels.h"
39
40
#include " folly/ScopeGuard.h"
40
41
#include " velox/common/time/CpuWallTimer.h"
41
42
#include " velox/dwio/common/ExecutorBarrier.h"
@@ -47,6 +48,11 @@ namespace detail {
47
48
48
49
class WriterContext : public FieldWriterContext {
49
50
public:
51
+ struct ColumnStats {
52
+ uint64_t logicalSize{0 };
53
+ uint64_t nullCount{0 };
54
+ };
55
+
50
56
const VeloxWriterOptions options;
51
57
std::unique_ptr<FlushPolicy> flushPolicy;
52
58
velox::CpuWallTiming totalFlushTiming;
@@ -63,6 +69,8 @@ class WriterContext : public FieldWriterContext {
63
69
uint64_t stripeSize{0 };
64
70
uint64_t rawSize{0 };
65
71
std::vector<uint64_t > rowsPerStripe;
72
+ std::unordered_map<offset_size, std::atomic<uint64_t >> streamPhysicalSize;
73
+ std::vector<ColumnStats> columnStats;
66
74
67
75
WriterContext (
68
76
velox::memory::MemoryPool& memoryPool,
@@ -516,10 +524,20 @@ bool VeloxWriter::write(const velox::VectorPtr& vector) {
516
524
auto size = vector->size ();
517
525
518
526
// Calculate raw size.
519
- auto rawSize = nimble::getRawSizeFromVector (
520
- vector, velox::common::Ranges::of (0 , size));
527
+ RawSizeContext context;
528
+ auto rawSize = nimble::getRawSizeFromRowVector (
529
+ vector, velox::common::Ranges::of (0 , size), context, /* topLevel=*/ true );
521
530
DWIO_ENSURE_GE (rawSize, 0 , " Invalid raw size" );
522
531
context_->rawSize += rawSize;
532
+ auto columnCount = context.columnCount ();
533
+ if (context_->columnStats .empty ()) {
534
+ context_->columnStats =
535
+ std::vector<detail::WriterContext::ColumnStats>(columnCount);
536
+ }
537
+ for (auto i = 0 ; i < columnCount; ++i) {
538
+ context_->columnStats [i].logicalSize += context.sizeAt (i);
539
+ context_->columnStats [i].nullCount += context.nullsAt (i);
540
+ }
523
541
524
542
if (context_->options .writeExecutor ) {
525
543
velox::dwio::common::ExecutorBarrier barrier{
@@ -580,6 +598,28 @@ void VeloxWriter::close() {
580
598
builder.GetSize ()});
581
599
}
582
600
601
+ {
602
+ // Accumulate column physical size.
603
+ std::vector<uint64_t > columnPhysicalSize (
604
+ context_->columnStats .size (), 0 );
605
+ nimble::StreamLabels streamLabels{nimble::SchemaReader::getSchema (
606
+ context_->schemaBuilder .getSchemaNodes ())};
607
+ for (const auto & [offset, streamSize] : context_->streamPhysicalSize ) {
608
+ if (offset == 0 ) {
609
+ continue ;
610
+ }
611
+ std::vector<std::string> streamLabel;
612
+ folly::split (
613
+ ' /' ,
614
+ streamLabels.streamLabel (offset),
615
+ streamLabel,
616
+ /* ignoreEmpty=*/ true );
617
+ NIMBLE_ASSERT (!streamLabel.empty (), " Invalid stream label" );
618
+ auto column = std::stoi (streamLabel[0 ]);
619
+ columnPhysicalSize[column] += streamSize;
620
+ }
621
+ }
622
+
583
623
{
584
624
flatbuffers::FlatBufferBuilder builder;
585
625
builder.Finish (serialization::CreateStats (builder, context_->rawSize ));
@@ -691,14 +731,16 @@ void VeloxWriter::writeChunk(bool lastChunk) {
691
731
StreamData& streamData_;
692
732
};
693
733
694
- auto encode = [&](StreamData& streamData) {
734
+ auto encode = [&](StreamData& streamData,
735
+ std::atomic<uint64_t >& streamSize) {
695
736
const auto offset = streamData.descriptor ().offset ();
696
737
auto encoded = encodeStream (*context_, *encodingBuffer_, streamData);
697
738
if (!encoded.empty ()) {
698
739
ChunkedStreamWriter chunkWriter{*encodingBuffer_};
699
740
NIMBLE_DASSERT (offset < streams_.size (), " Stream offset out of range." );
700
741
auto & stream = streams_[offset];
701
742
for (auto & buffer : chunkWriter.encode (encoded)) {
743
+ streamSize += buffer.size ();
702
744
chunkSize += buffer.size ();
703
745
stream.content .push_back (std::move (buffer));
704
746
}
@@ -739,29 +781,35 @@ void VeloxWriter::writeChunk(bool lastChunk) {
739
781
velox::dwio::common::ExecutorBarrier barrier{
740
782
context_->options .encodingExecutor };
741
783
for (auto & streamData : context_->streams ()) {
784
+ auto & streamSize =
785
+ context_->streamPhysicalSize [streamData->descriptor ().offset ()];
742
786
processStream (
743
787
*streamData, [&](StreamData& innerStreamData, bool isNullStream) {
744
- barrier.add ([&innerStreamData, isNullStream, &encode]() {
745
- if (isNullStream) {
746
- NullsAsDataStreamData nullsStreamData{innerStreamData};
747
- encode (nullsStreamData);
748
- } else {
749
- encode (innerStreamData);
750
- }
751
- });
788
+ barrier.add (
789
+ [&innerStreamData, isNullStream, &encode, &streamSize]() {
790
+ if (isNullStream) {
791
+ NullsAsDataStreamData nullsStreamData{innerStreamData};
792
+ encode (nullsStreamData, streamSize);
793
+ } else {
794
+ encode (innerStreamData, streamSize);
795
+ }
796
+ });
752
797
});
753
798
}
754
799
barrier.waitAll ();
755
800
} else {
756
801
for (auto & streamData : context_->streams ()) {
802
+ auto & streamSize =
803
+ context_->streamPhysicalSize [streamData->descriptor ().offset ()];
757
804
processStream (
758
805
*streamData,
759
- [&encode](StreamData& innerStreamData, bool isNullStream) {
806
+ [&encode, &streamSize](
807
+ StreamData& innerStreamData, bool isNullStream) {
760
808
if (isNullStream) {
761
809
NullsAsDataStreamData nullsStreamData{innerStreamData};
762
- encode (nullsStreamData);
810
+ encode (nullsStreamData, streamSize );
763
811
} else {
764
- encode (innerStreamData);
812
+ encode (innerStreamData, streamSize );
765
813
}
766
814
});
767
815
}
0 commit comments