@@ -47,6 +47,11 @@ namespace detail {
47
47
48
48
class WriterContext : public FieldWriterContext {
49
49
public:
50
+ struct ColumnStats {
51
+ uint64_t logicalSize{0 };
52
+ uint64_t nullCount{0 };
53
+ };
54
+
50
55
const VeloxWriterOptions options;
51
56
std::unique_ptr<FlushPolicy> flushPolicy;
52
57
velox::CpuWallTiming totalFlushTiming;
@@ -63,6 +68,8 @@ class WriterContext : public FieldWriterContext {
63
68
uint64_t stripeSize{0 };
64
69
uint64_t rawSize{0 };
65
70
std::vector<uint64_t > rowsPerStripe;
71
+ std::map<offset_size, std::atomic<uint64_t >> streamPhysicalSize;
72
+ std::vector<ColumnStats> columnStats;
66
73
67
74
WriterContext (
68
75
velox::memory::MemoryPool& memoryPool,
@@ -516,10 +523,21 @@ bool VeloxWriter::write(const velox::VectorPtr& vector) {
516
523
auto size = vector->size ();
517
524
518
525
// Calculate raw size.
519
- auto rawSize = nimble::getRawSizeFromVector (
520
- vector, velox::common::Ranges::of (0 , size));
526
+ RawSizeContext context;
527
+ auto rawSize = nimble::getRawSizeFromRowVector (
528
+ vector, velox::common::Ranges::of (0 , size), context, /* topLevel=*/ true );
529
+ LOG (INFO) << " Raw size: " << rawSize;
521
530
DWIO_ENSURE_GE (rawSize, 0 , " Invalid raw size" );
522
531
context_->rawSize += rawSize;
532
+ auto columnCount = context.columnCount ();
533
+ if (context_->columnStats .empty ()) {
534
+ context_->columnStats =
535
+ std::vector<detail::WriterContext::ColumnStats>(columnCount);
536
+ }
537
+ for (auto i = 0 ; i < columnCount; ++i) {
538
+ context_->columnStats [i].logicalSize += context.sizeAt (i);
539
+ context_->columnStats [i].nullCount += context.nullsAt (i);
540
+ }
523
541
524
542
if (context_->options .writeExecutor ) {
525
543
velox::dwio::common::ExecutorBarrier barrier{
@@ -580,6 +598,26 @@ void VeloxWriter::close() {
580
598
builder.GetSize ()});
581
599
}
582
600
601
+ {
602
+ // Accumulate column physical size.
603
+ std::vector<uint64_t > columnPhysicalSize (
604
+ context_->columnStats .size (), 0 );
605
+ const auto & physicalSize = context_->streamPhysicalSize ;
606
+ for (const auto & node : context_->schemaBuilder .getSchemaNodes ()) {
607
+ auto column = node->column ();
608
+ offset_size offset = node->offset ();
609
+ if (physicalSize.contains (offset) && column.has_value ()) {
610
+ NIMBLE_CHECK (
611
+ column.value () < columnPhysicalSize.size (),
612
+ fmt::format (
613
+ " Column {} is out of range. Schema has {} columns." ,
614
+ column.value (),
615
+ columnPhysicalSize.size ()));
616
+ columnPhysicalSize[column.value ()] += physicalSize.at (offset);
617
+ }
618
+ }
619
+ }
620
+
583
621
{
584
622
flatbuffers::FlatBufferBuilder builder;
585
623
builder.Finish (serialization::CreateStats (builder, context_->rawSize ));
@@ -691,14 +729,16 @@ void VeloxWriter::writeChunk(bool lastChunk) {
691
729
StreamData& streamData_;
692
730
};
693
731
694
- auto encode = [&](StreamData& streamData) {
732
+ auto encode = [&](StreamData& streamData,
733
+ std::atomic<uint64_t >& streamSize) {
695
734
const auto offset = streamData.descriptor ().offset ();
696
735
auto encoded = encodeStream (*context_, *encodingBuffer_, streamData);
697
736
if (!encoded.empty ()) {
698
737
ChunkedStreamWriter chunkWriter{*encodingBuffer_};
699
738
NIMBLE_DASSERT (offset < streams_.size (), " Stream offset out of range." );
700
739
auto & stream = streams_[offset];
701
740
for (auto & buffer : chunkWriter.encode (encoded)) {
741
+ streamSize += buffer.size ();
702
742
chunkSize += buffer.size ();
703
743
stream.content .push_back (std::move (buffer));
704
744
}
@@ -739,29 +779,35 @@ void VeloxWriter::writeChunk(bool lastChunk) {
739
779
velox::dwio::common::ExecutorBarrier barrier{
740
780
context_->options .encodingExecutor };
741
781
for (auto & streamData : context_->streams ()) {
782
+ auto & streamSize =
783
+ context_->streamPhysicalSize [streamData->descriptor ().offset ()];
742
784
processStream (
743
785
*streamData, [&](StreamData& innerStreamData, bool isNullStream) {
744
- barrier.add ([&innerStreamData, isNullStream, &encode]() {
745
- if (isNullStream) {
746
- NullsAsDataStreamData nullsStreamData{innerStreamData};
747
- encode (nullsStreamData);
748
- } else {
749
- encode (innerStreamData);
750
- }
751
- });
786
+ barrier.add (
787
+ [&innerStreamData, isNullStream, &encode, &streamSize]() {
788
+ if (isNullStream) {
789
+ NullsAsDataStreamData nullsStreamData{innerStreamData};
790
+ encode (nullsStreamData, streamSize);
791
+ } else {
792
+ encode (innerStreamData, streamSize);
793
+ }
794
+ });
752
795
});
753
796
}
754
797
barrier.waitAll ();
755
798
} else {
756
799
for (auto & streamData : context_->streams ()) {
800
+ auto & streamSize =
801
+ context_->streamPhysicalSize [streamData->descriptor ().offset ()];
757
802
processStream (
758
803
*streamData,
759
- [&encode](StreamData& innerStreamData, bool isNullStream) {
804
+ [&encode, &streamSize](
805
+ StreamData& innerStreamData, bool isNullStream) {
760
806
if (isNullStream) {
761
807
NullsAsDataStreamData nullsStreamData{innerStreamData};
762
- encode (nullsStreamData);
808
+ encode (nullsStreamData, streamSize );
763
809
} else {
764
- encode (innerStreamData);
810
+ encode (innerStreamData, streamSize );
765
811
}
766
812
});
767
813
}
0 commit comments