@@ -47,6 +47,11 @@ namespace detail {
47
47
48
48
class WriterContext : public FieldWriterContext {
49
49
public:
50
+ struct ColumnStats {
51
+ uint64_t logicalSize{0 };
52
+ uint64_t nullCount{0 };
53
+ };
54
+
50
55
const VeloxWriterOptions options;
51
56
std::unique_ptr<FlushPolicy> flushPolicy;
52
57
velox::CpuWallTiming totalFlushTiming;
@@ -63,6 +68,8 @@ class WriterContext : public FieldWriterContext {
63
68
uint64_t stripeSize{0 };
64
69
uint64_t rawSize{0 };
65
70
std::vector<uint64_t > rowsPerStripe;
71
+ std::map<offset_size, std::atomic<uint64_t >> streamPhysicalSize;
72
+ std::vector<ColumnStats> columnStats;
66
73
67
74
WriterContext (
68
75
velox::memory::MemoryPool& memoryPool,
@@ -516,10 +523,20 @@ bool VeloxWriter::write(const velox::VectorPtr& vector) {
516
523
auto size = vector->size ();
517
524
518
525
// Calculate raw size.
519
- auto rawSize = nimble::getRawSizeFromVector (
520
- vector, velox::common::Ranges::of (0 , size));
526
+ RawSizeContext context;
527
+ auto rawSize = nimble::getRawSizeFromRowVector (
528
+ vector, velox::common::Ranges::of (0 , size), context, /* topLevel=*/ true );
521
529
DWIO_ENSURE_GE (rawSize, 0 , " Invalid raw size" );
522
530
context_->rawSize += rawSize;
531
+ auto columnCount = context.columnCount ();
532
+ if (context_->columnStats .empty ()) {
533
+ context_->columnStats =
534
+ std::vector<detail::WriterContext::ColumnStats>(columnCount);
535
+ }
536
+ for (auto i = 0 ; i < columnCount; ++i) {
537
+ context_->columnStats [i].logicalSize += context.sizeAt (i);
538
+ context_->columnStats [i].nullCount += context.nullsAt (i);
539
+ }
523
540
524
541
if (context_->options .writeExecutor ) {
525
542
velox::dwio::common::ExecutorBarrier barrier{
@@ -580,6 +597,32 @@ void VeloxWriter::close() {
580
597
builder.GetSize ()});
581
598
}
582
599
600
+ {
601
+ // accumulate column physical size
602
+ // TODO: Clean up unnecessary logs
603
+ auto columnCount = context_->columnStats .size ();
604
+ const auto & physicalSize = context_->streamPhysicalSize ;
605
+ std::vector<uint64_t > columnPhysicalSize (columnCount, 0 );
606
+ for (const auto & node : context_->schemaBuilder .getSchemaNodes ()) {
607
+ auto column = node->parentNodeOffset ();
608
+ offset_size offset = node->offset ();
609
+ if (physicalSize.contains (offset) && column.has_value ()) {
610
+ columnPhysicalSize[column.value ()] += physicalSize.at (offset);
611
+ } else {
612
+ LOG (INFO) << " Nothing for stream " << offset;
613
+ }
614
+ }
615
+ // LOG column stats
616
+ for (int i = 0 ; i < columnCount; ++i) {
617
+ LOG (INFO) << fmt::format (
618
+ " Column {} logical size: {}, physical size: {}, null count: {}" ,
619
+ i,
620
+ context_->columnStats [i].logicalSize ,
621
+ columnPhysicalSize[i],
622
+ context_->columnStats [i].nullCount );
623
+ }
624
+ }
625
+
583
626
{
584
627
flatbuffers::FlatBufferBuilder builder;
585
628
builder.Finish (serialization::CreateStats (builder, context_->rawSize ));
@@ -698,10 +741,13 @@ void VeloxWriter::writeChunk(bool lastChunk) {
698
741
ChunkedStreamWriter chunkWriter{*encodingBuffer_};
699
742
NIMBLE_DASSERT (offset < streams_.size (), " Stream offset out of range." );
700
743
auto & stream = streams_[offset];
744
+ uint64_t streamSize = 0 ;
701
745
for (auto & buffer : chunkWriter.encode (encoded)) {
702
- chunkSize += buffer.size ();
746
+ streamSize += buffer.size ();
703
747
stream.content .push_back (std::move (buffer));
704
748
}
749
+ context_->streamPhysicalSize [offset].fetch_add (streamSize);
750
+ chunkSize += streamSize;
705
751
}
706
752
streamData.reset ();
707
753
};
@@ -778,8 +824,9 @@ void VeloxWriter::writeChunk(bool lastChunk) {
778
824
auto flushWallTimeMs =
779
825
(context_->stripeFlushTiming .wallNanos - previousFlushWallTime) /
780
826
1'000'000 ;
781
- VLOG (1 ) << " writeChunk milliseconds: " << flushWallTimeMs
782
- << " , chunk bytes: " << chunkSize;
827
+ // TODO: Clean up unnecessary logs
828
+ LOG (INFO) << " writeChunk milliseconds: " << flushWallTimeMs
829
+ << " , chunk bytes: " << chunkSize;
783
830
}
784
831
785
832
uint32_t VeloxWriter::writeStripe () {
0 commit comments