3636#include  " dwio/nimble/velox/SchemaSerialization.h" 
3737#include  " dwio/nimble/velox/SchemaTypes.h" 
3838#include  " dwio/nimble/velox/StatsGenerated.h" 
39+ #include  " dwio/nimble/velox/StreamLabels.h" 
3940#include  " folly/ScopeGuard.h" 
4041#include  " velox/common/time/CpuWallTimer.h" 
4142#include  " velox/dwio/common/ExecutorBarrier.h" 
@@ -47,6 +48,11 @@ namespace detail {
4748
4849class  WriterContext  : public  FieldWriterContext  {
4950 public: 
51+   struct  ColumnStats  {
52+     uint64_t  logicalSize{0 };
53+     uint64_t  nullCount{0 };
54+   };
55+ 
5056  const  VeloxWriterOptions options;
5157  std::unique_ptr<FlushPolicy> flushPolicy;
5258  velox::CpuWallTiming totalFlushTiming;
@@ -63,6 +69,8 @@ class WriterContext : public FieldWriterContext {
6369  uint64_t  stripeSize{0 };
6470  uint64_t  rawSize{0 };
6571  std::vector<uint64_t > rowsPerStripe;
72+   std::unordered_map<offset_size, std::atomic<uint64_t >> streamPhysicalSize;
73+   std::vector<ColumnStats> columnStats;
6674
6775  WriterContext (
6876      velox::memory::MemoryPool& memoryPool,
@@ -516,10 +524,20 @@ bool VeloxWriter::write(const velox::VectorPtr& vector) {
516524    auto  size = vector->size ();
517525
518526    //  Calculate raw size.
519-     auto  rawSize = nimble::getRawSizeFromVector (
520-         vector, velox::common::Ranges::of (0 , size));
527+     RawSizeContext context;
528+     auto  rawSize = nimble::getRawSizeFromRowVector (
529+         vector, velox::common::Ranges::of (0 , size), context, /* topLevel=*/ true );
521530    DWIO_ENSURE_GE (rawSize, 0 , " Invalid raw size" 
522531    context_->rawSize  += rawSize;
532+     auto  columnCount = context.columnCount ();
533+     if  (context_->columnStats .empty ()) {
534+       context_->columnStats  =
535+           std::vector<detail::WriterContext::ColumnStats>(columnCount);
536+     }
537+     for  (auto  i = 0 ; i < columnCount; ++i) {
538+       context_->columnStats [i].logicalSize  += context.sizeAt (i);
539+       context_->columnStats [i].nullCount  += context.nullsAt (i);
540+     }
523541
524542    if  (context_->options .writeExecutor ) {
525543      velox::dwio::common::ExecutorBarrier barrier{
@@ -580,6 +598,28 @@ void VeloxWriter::close() {
580598             builder.GetSize ()});
581599      }
582600
601+       {
602+         //  Accumulate column physical size.
603+         std::vector<uint64_t > columnPhysicalSize (
604+             context_->columnStats .size (), 0 );
605+         nimble::StreamLabels streamLabels{nimble::SchemaReader::getSchema (
606+             context_->schemaBuilder .getSchemaNodes ())};
607+         for  (const  auto & [offset, streamSize] : context_->streamPhysicalSize ) {
608+           if  (offset == 0 ) {
609+             continue ;
610+           }
611+           std::vector<std::string> streamLabel;
612+           folly::split (
613+               ' /' 
614+               streamLabels.streamLabel (offset),
615+               streamLabel,
616+               /* ignoreEmpty=*/ true );
617+           NIMBLE_ASSERT (!streamLabel.empty (), " Invalid stream label" 
618+           auto  column = std::stoi (streamLabel[0 ]);
619+           columnPhysicalSize[column] += streamSize;
620+         }
621+       }
622+ 
583623      {
584624        flatbuffers::FlatBufferBuilder builder;
585625        builder.Finish (serialization::CreateStats (builder, context_->rawSize ));
@@ -691,14 +731,16 @@ void VeloxWriter::writeChunk(bool lastChunk) {
691731      StreamData& streamData_;
692732    };
693733
694-     auto  encode = [&](StreamData& streamData) {
734+     auto  encode = [&](StreamData& streamData,
735+                       std::atomic<uint64_t >& streamSize) {
695736      const  auto  offset = streamData.descriptor ().offset ();
696737      auto  encoded = encodeStream (*context_, *encodingBuffer_, streamData);
697738      if  (!encoded.empty ()) {
698739        ChunkedStreamWriter chunkWriter{*encodingBuffer_};
699740        NIMBLE_DASSERT (offset < streams_.size (), " Stream offset out of range." 
700741        auto & stream = streams_[offset];
701742        for  (auto & buffer : chunkWriter.encode (encoded)) {
743+           streamSize += buffer.size ();
702744          chunkSize += buffer.size ();
703745          stream.content .push_back (std::move (buffer));
704746        }
@@ -739,29 +781,35 @@ void VeloxWriter::writeChunk(bool lastChunk) {
739781      velox::dwio::common::ExecutorBarrier barrier{
740782          context_->options .encodingExecutor };
741783      for  (auto & streamData : context_->streams ()) {
784+         auto & streamSize =
785+             context_->streamPhysicalSize [streamData->descriptor ().offset ()];
742786        processStream (
743787            *streamData, [&](StreamData& innerStreamData, bool  isNullStream) {
744-               barrier.add ([&innerStreamData, isNullStream, &encode]() {
745-                 if  (isNullStream) {
746-                   NullsAsDataStreamData nullsStreamData{innerStreamData};
747-                   encode (nullsStreamData);
748-                 } else  {
749-                   encode (innerStreamData);
750-                 }
751-               });
788+               barrier.add (
789+                   [&innerStreamData, isNullStream, &encode, &streamSize]() {
790+                     if  (isNullStream) {
791+                       NullsAsDataStreamData nullsStreamData{innerStreamData};
792+                       encode (nullsStreamData, streamSize);
793+                     } else  {
794+                       encode (innerStreamData, streamSize);
795+                     }
796+                   });
752797            });
753798      }
754799      barrier.waitAll ();
755800    } else  {
756801      for  (auto & streamData : context_->streams ()) {
802+         auto & streamSize =
803+             context_->streamPhysicalSize [streamData->descriptor ().offset ()];
757804        processStream (
758805            *streamData,
759-             [&encode](StreamData& innerStreamData, bool  isNullStream) {
806+             [&encode, &streamSize](
807+                 StreamData& innerStreamData, bool  isNullStream) {
760808              if  (isNullStream) {
761809                NullsAsDataStreamData nullsStreamData{innerStreamData};
762-                 encode (nullsStreamData);
810+                 encode (nullsStreamData, streamSize );
763811              } else  {
764-                 encode (innerStreamData);
812+                 encode (innerStreamData, streamSize );
765813              }
766814            });
767815      }
0 commit comments